Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiAddressClientBuilder execution strategy control #2166

Merged
merged 64 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
dc260f4
`MultiAddressClientBuilder` execution strategy control
bondolo Jan 21, 2022
20cec2a
fixes ClientEffectiveStrategyTest for basic Single and Multi builders
bondolo Apr 1, 2022
65ceda1
fixes ClientEffectiveStrategyTest for multi builder with initializer
bondolo Apr 1, 2022
61814d1
pre-final cleanup
bondolo Apr 1, 2022
09e2ea9
documentation improvements
bondolo Apr 1, 2022
db7f892
Handle single address builder overrides of offloadNone()
bondolo Apr 7, 2022
6392624
checkstyle nits
bondolo Apr 7, 2022
24d6216
combine client api test cases
bondolo Apr 7, 2022
dd7a88d
review feedback
bondolo Apr 13, 2022
074584a
remove DefaultClientGroup changes from PR
bondolo Apr 13, 2022
2be80b6
test cleanup
bondolo Apr 13, 2022
b2dbc55
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Apr 14, 2022
4ade1dc
move FilterableClientToClient back
bondolo Apr 14, 2022
b0d2c3e
additional review feedback
bondolo Apr 15, 2022
0f7c444
test cleanup
bondolo Apr 15, 2022
3e151d6
test cleanup
bondolo Apr 15, 2022
035fdd3
fix multi-address builder strategy computation
bondolo Apr 15, 2022
030833a
checkstyle nit
bondolo Apr 15, 2022
5cb3a92
more test simplification
bondolo Apr 15, 2022
efba308
even more review feedback
bondolo Apr 15, 2022
e541a50
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Apr 18, 2022
47ec7f8
test improvements and suggestions from Idel's review
bondolo Apr 18, 2022
8f0819d
allow send offload to execute on application thread
bondolo Apr 19, 2022
330824e
allow send offload to execute on application thread
bondolo Apr 19, 2022
97453d9
restore second request
bondolo Apr 19, 2022
c7d00bd
enhance visibility in tests
idelpivnitskiy Apr 20, 2022
522e7e6
more debugging
idelpivnitskiy Apr 20, 2022
36def49
more debugging 2
idelpivnitskiy Apr 20, 2022
9e6530d
add timestamps
idelpivnitskiy Apr 21, 2022
90d1728
Merge branch 'apple:main' into multiaddress-initializer-strategy
bondolo Apr 21, 2022
2c7e1df
change order or ClientApi
idelpivnitskiy Apr 21, 2022
5d95094
only allow application thread if not offloading
bondolo Apr 20, 2022
be76acf
use local for simplification
bondolo Apr 21, 2022
1419244
revert unintended changes
bondolo Apr 21, 2022
60040b2
capture where context map created and assigned
idelpivnitskiy Apr 22, 2022
efaa5a5
capture requestContext id too
idelpivnitskiy Apr 22, 2022
9d6e6a4
Remove ExecutionMode.CONCURRENT
idelpivnitskiy Apr 22, 2022
ef49a05
Revert intermediate debugging
idelpivnitskiy Apr 23, 2022
d670415
Remove intermediate debug logging
idelpivnitskiy Apr 23, 2022
a33671b
reorder client api
idelpivnitskiy Apr 23, 2022
312c836
revert `ExecutionMode.CONCURRENT`
idelpivnitskiy Apr 23, 2022
34a468d
small refactoring
idelpivnitskiy Apr 23, 2022
3079781
small refactoring 2
idelpivnitskiy Apr 23, 2022
f591a1c
small refactoring 3
idelpivnitskiy Apr 23, 2022
79b922c
rename filter, add javadoc to clarify its behavior
idelpivnitskiy Apr 23, 2022
da80a7e
Add assertion checking for conversion reducing offloading
bondolo Apr 25, 2022
b45b45d
improve executionStrategy() documentation
bondolo Apr 27, 2022
09e63bd
improved javadoc
bondolo Apr 29, 2022
d5a83f0
documentation refinement for discussion
bondolo May 18, 2022
acd2535
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Jun 6, 2022
dfa383b
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Jun 7, 2022
8380d2a
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Jun 7, 2022
cead445
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Jun 8, 2022
945762a
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Jun 8, 2022
10da51b
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Jun 9, 2022
7652859
Use current strategy for merging with offloadNone()
bondolo Jun 9, 2022
c52950c
adjust everything to current "builder takes precedece" rule
bondolo Jun 10, 2022
aebccc9
documentation matches behaviour and fix multi-offloadNone overrid
bondolo Jun 10, 2022
55c5095
fix javadoc nit
bondolo Jun 10, 2022
239b3aa
remove redundant modification of the request execution strategy
bondolo Jun 10, 2022
b6a4c15
checkstyle nit
bondolo Jun 10, 2022
c385897
remove assertion of incompatible behavior
bondolo Jun 15, 2022
81f818f
Merge branch 'main' into multiaddress-initializer-strategy
bondolo Jun 15, 2022
ee1ba16
final review feedback
bondolo Jun 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,42 @@ default SingleAddressInitializer<U, R> append(SingleAddressInitializer<U, R> toA
@Override
MultiAddressHttpClientBuilder<U, R> executor(Executor executor);

/**
* {@inheritDoc}
*
* <p>Provides the base execution strategy for all clients created from this builder and the default strategy for
* the {@link SingleAddressHttpClientBuilder} used to construct client instances. The
* {@link #initializer(SingleAddressInitializer)} may be used for some customization of the execution strategy for a
* specific single address client instance, but may not reduce the offloading to be performed. Specifically, the
* initializer may introduce additional offloading via
* {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)} and may add filters which
* influence the computed execution strategy.
*
* <p>Specifying an execution strategy will affect the offloading used during the execution of client requests:
*
* <dl>
* <dt>Unspecified or {@link HttpExecutionStrategies#defaultStrategy()}
* <dd>The resulting client instances will use the default safe strategy for each API variant and
* {@link SingleAddressHttpClientBuilder} instances generated will also have default strategy.
*
* <dt>{@link HttpExecutionStrategies#offloadNone()}
* (or deprecated {@link HttpExecutionStrategies#offloadNever()})
* <dd>{@link SingleAddressHttpClientBuilder} instances created by the client will have a strategy of
* {@link HttpExecutionStrategies#offloadNone()}. {@link HttpExecutionStrategies#offloadNone()} execution
* strategy requires that filters and asynchronous callbacks
* <strong style="text-transform: uppercase;">must not</strong> ever block during the execution of client
* requests. An {@link #initializer(SingleAddressInitializer) initializer} may override to add offloads using
* {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}.
*
* <dt>A custom execution strategy ({@link HttpExecutionStrategies#customStrategyBuilder()}) or
* {@link HttpExecutionStrategies#offloadAll()}
* <dd>{@link SingleAddressHttpClientBuilder} instances created by the client will start with the provided
* strategy and may add additional offloading as required by added filters.
* </dl>
*
* @param strategy {@inheritDoc}
* @return {@inheritDoc}
*/
@Override
MultiAddressHttpClientBuilder<U, R> executionStrategy(HttpExecutionStrategy strategy);

Expand All @@ -90,7 +126,10 @@ default MultiAddressHttpClientBuilder<U, R> headersFactory(HttpHeadersFactory he
/**
* Set a function which can customize options for each {@link StreamingHttpClient} that is built.
* @param initializer Initializes the {@link SingleAddressHttpClientBuilder} used to build new
* {@link StreamingHttpClient}s.
* {@link StreamingHttpClient}s. See {@link #executionStrategy(HttpExecutionStrategy)} for discussion of
* restrictions on the use of {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}
* within an initializer.
*
* @return {@code this}
*/
MultiAddressHttpClientBuilder<U, R> initializer(SingleAddressInitializer<U, R> initializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,42 @@ SingleAddressHttpClientBuilder<U, R> appendConnectionFilter(
@Override
SingleAddressHttpClientBuilder<U, R> executor(Executor executor);

/**
* {@inheritDoc}
*
* <p>Specifying an execution strategy affects the offloading used during execution of client requests:
*
* <dl>
* <dt>Unspecified or {@link HttpExecutionStrategies#defaultStrategy()}
* <dd>Execution of client requests will use a safe (non-blocking) execution strategy appropriate for the
* client API used and the filters added. Blocking is always safe as all potentially blocking paths are
* offloaded. Each client API variant (async/blocking streaming/aggregate) requires a specific execution
* strategy to avoid blocking the event-loop and filters added via
* {@link #appendClientFilter(StreamingHttpClientFilterFactory)},
* {@link #appendConnectionFilter(StreamingHttpConnectionFilterFactory)}, or
* {@link #appendConnectionFactoryFilter(ConnectionFactoryFilter)}, etc. may also require offloading.
* The execution strategy for execution of client requests will be computed based on the client API in use and
* {@link HttpExecutionStrategyInfluencer#requiredOffloads()} of added the filters.
*
* <dt>{@link HttpExecutionStrategies#offloadNone()}
* (or deprecated {@link HttpExecutionStrategies#offloadNever()})
* <dd>No offloading will be used during execution of client requests regardless of the client API used or the
* influence of added filters. Filters and asynchronous callbacks
* <strong style="text-transform: uppercase;">must not</strong> ever block during the execution of client
* requests.
*
* <dt>A custom execution strategy ({@link HttpExecutionStrategies#customStrategyBuilder()}) or
* {@link HttpExecutionStrategies#offloadAll()}
* <dd>The specified execution strategy will be used for executing client requests rather than the client
* API's default safe strategy. Like with the default strategy, the actual execution strategy used is computed
* from the provided strategy and the execution strategies required by added filters. Filters and asynchronous
* callbacks <strong style="text-transform: uppercase;">MAY</strong> only block during the offloaded portions of
* the client request execution.
* </dl>
*
* @param strategy {@inheritDoc}
* @return {@inheritDoc}
*/
@Override
SingleAddressHttpClientBuilder<U, R> executionStrategy(HttpExecutionStrategy strategy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ final class StreamingHttpClientToBlockingHttpClient implements BlockingHttpClien
private final HttpRequestResponseFactory reqRespFactory;

StreamingHttpClientToBlockingHttpClient(final StreamingHttpClient client, final HttpExecutionStrategy strategy) {
this.strategy = defaultStrategy() == strategy ?
DEFAULT_BLOCKING_CONNECTION_STRATEGY : strategy;
this.strategy = defaultStrategy() == strategy ? DEFAULT_BLOCKING_CONNECTION_STRATEGY : strategy;
this.client = client;
context = new DelegatingHttpExecutionContext(client.executionContext()) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toAggregated;
import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_CONNECTION_STRATEGY;
import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_ASYNC_CONNECTION_STRATEGY;
import static java.util.Objects.requireNonNull;

final class StreamingHttpClientToHttpClient implements HttpClient {
Expand All @@ -32,7 +32,7 @@ final class StreamingHttpClientToHttpClient implements HttpClient {
private final HttpRequestResponseFactory reqRespFactory;

StreamingHttpClientToHttpClient(final StreamingHttpClient client, final HttpExecutionStrategy strategy) {
this.strategy = defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy;
this.strategy = defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy;
this.client = client;
context = new DelegatingHttpExecutionContext(client.executionContext()) {
@Override
Expand Down Expand Up @@ -118,7 +118,7 @@ static final class ReservedStreamingHttpConnectionToReservedHttpConnection imple

ReservedStreamingHttpConnectionToReservedHttpConnection(final ReservedStreamingHttpConnection connection,
final HttpExecutionStrategy strategy) {
this(connection, defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy,
this(connection, defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy,
toAggregated(connection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class StreamingHttpConnectionToHttpConnection implements HttpConnection {
* For aggregation, we invoke the user callback (Single from client#request()) after the payload is completed,
* hence we need to offload data.
*/
static final HttpExecutionStrategy DEFAULT_CONNECTION_STRATEGY = OFFLOAD_RECEIVE_DATA_EVENT_STRATEGY;
static final HttpExecutionStrategy DEFAULT_ASYNC_CONNECTION_STRATEGY = OFFLOAD_RECEIVE_DATA_EVENT_STRATEGY;
private final StreamingHttpConnection connection;
private final HttpExecutionStrategy strategy;
private final HttpConnectionContext context;
Expand All @@ -37,7 +37,7 @@ final class StreamingHttpConnectionToHttpConnection implements HttpConnection {

StreamingHttpConnectionToHttpConnection(final StreamingHttpConnection connection,
final HttpExecutionStrategy strategy) {
this.strategy = defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy;
this.strategy = defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy;
this.connection = connection;
final HttpConnectionContext originalCtx = connection.connectionContext();
executionContext = new DelegatingHttpExecutionContext(connection.executionContext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpRequestMetaData;
Expand All @@ -37,8 +40,11 @@
import io.servicetalk.http.api.RedirectConfig;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.utils.RedirectingHttpRequesterFilter;
Expand All @@ -63,6 +69,10 @@
import static io.servicetalk.concurrent.api.AsyncCloseables.toListenableAsyncCloseable;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -106,12 +116,12 @@ public StreamingHttpClient buildStreaming() {
final CompositeCloseable closeables = newCompositeCloseable();
try {
final HttpExecutionContext executionContext = executionContextBuilder.build();
final ClientFactory clientFactory = new ClientFactory(builderFactory, executionContext,
singleAddressInitializer);
final ClientFactory clientFactory =
new ClientFactory(builderFactory, executionContext, singleAddressInitializer);
final CachingKeyFactory keyFactory = closeables.prepend(new CachingKeyFactory());
final HttpHeadersFactory headersFactory = this.headersFactory;
FilterableStreamingHttpClient urlClient = closeables.prepend(
new StreamingUrlHttpClient(executionContext, clientFactory, keyFactory,
new StreamingUrlHttpClient(executionContext, keyFactory, clientFactory,
new DefaultStreamingHttpRequestResponseFactory(executionContext.bufferAllocator(),
headersFactory != null ? headersFactory : DefaultHttpHeadersFactory.INSTANCE,
HTTP_1_1)));
Expand Down Expand Up @@ -240,6 +250,8 @@ public StreamingHttpClient apply(final UrlKey urlKey) {
builder.sslConfig(DEFAULT_CLIENT_SSL_CONFIG);
}

builder.appendClientFilter(HttpExecutionStrategyUpdater.INSTANCE);

if (singleAddressInitializer != null) {
singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, builder);
}
Expand All @@ -248,6 +260,63 @@ public StreamingHttpClient apply(final UrlKey urlKey) {
}
}

private static void singleClientStrategyUpdate(ContextMap context, HttpExecutionStrategy singleStrategy) {
HttpExecutionStrategy requestStrategy = context.getOrDefault(HTTP_EXECUTION_STRATEGY_KEY, defaultStrategy());
assert null != requestStrategy : "Request strategy unexpectedly null";
HttpExecutionStrategy useStrategy = defaultStrategy() == requestStrategy ?
// For all apis except async streaming default conversion has already been done.
// This is the default to required strategy resolution for the async streaming client.
offloadAll() :
defaultStrategy() == singleStrategy || !singleStrategy.hasOffloads() ?
// single client is default or has no *additional* offloads
requestStrategy :
// add single client offloads to existing strategy
requestStrategy.merge(singleStrategy);

if (useStrategy != requestStrategy) {
LOGGER.debug("Request strategy {} changes to {}. SingleAddressClient strategy: {}",
requestStrategy, useStrategy, singleStrategy);
context.put(HTTP_EXECUTION_STRATEGY_KEY, useStrategy);
}
}

/**
* When request transitions from the multi-address level to the single-address level, this filter will make sure
* that any missing offloading required by the selected single-address client will be applied for the request
* execution. This filter never reduces offloading, it can only add missing offloading flags. Users who want to
* execute a request without offloading must specify {@link HttpExecutionStrategies#offloadNone()} strategy at the
* {@link MultiAddressHttpClientBuilder} or explicitly set the required strategy at request context with
* {@link HttpContextKeys#HTTP_EXECUTION_STRATEGY_KEY}.
*/
private static final class HttpExecutionStrategyUpdater implements StreamingHttpClientFilterFactory {

static final StreamingHttpClientFilterFactory INSTANCE = new HttpExecutionStrategyUpdater();

private HttpExecutionStrategyUpdater() {
// Singleton
}

@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(
final StreamingHttpRequester delegate, final StreamingHttpRequest request) {
return defer(() -> {
singleClientStrategyUpdate(request.context(), client.executionContext().executionStrategy());

return delegate.request(request);
});
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return offloadNone();
}
}

private static final class StreamingUrlHttpClient implements FilterableStreamingHttpClient {
private final HttpExecutionContext executionContext;
private final StreamingHttpRequestResponseFactory reqRespFactory;
Expand All @@ -256,8 +325,7 @@ private static final class StreamingUrlHttpClient implements FilterableStreaming
private final ListenableAsyncCloseable closeable;

StreamingUrlHttpClient(final HttpExecutionContext executionContext,
final Function<UrlKey, FilterableStreamingHttpClient> clientFactory,
final CachingKeyFactory keyFactory,
final CachingKeyFactory keyFactory, final ClientFactory clientFactory,
final StreamingHttpRequestResponseFactory reqRespFactory) {
this.reqRespFactory = requireNonNull(reqRespFactory);
this.group = ClientGroup.from(clientFactory);
Expand All @@ -278,7 +346,10 @@ public Single<? extends FilterableReservedStreamingHttpConnection> reserveConnec
final HttpRequestMetaData metaData) {
return defer(() -> {
try {
return selectClient(metaData).reserveConnection(metaData).shareContextOnSubscribe();
FilterableStreamingHttpClient singleClient = selectClient(metaData);
singleClientStrategyUpdate(metaData.context(), singleClient.executionContext().executionStrategy());

return singleClient.reserveConnection(metaData).shareContextOnSubscribe();
} catch (Throwable t) {
return Single.<FilterableReservedStreamingHttpConnection>failed(t).shareContextOnSubscribe();
}
Expand Down
Loading