diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/MultiAddressHttpClientBuilder.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/MultiAddressHttpClientBuilder.java index 89b49d7ebf..934dce1830 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/MultiAddressHttpClientBuilder.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/MultiAddressHttpClientBuilder.java @@ -69,6 +69,42 @@ default SingleAddressInitializer append(SingleAddressInitializer toA @Override MultiAddressHttpClientBuilder executor(Executor executor); + /** + * {@inheritDoc} + * + *

Provides the base execution strategy for all clients created from this builder and the default strategy for + * the {@link SingleAddressHttpClientBuilder} used to construct client instances. The + * {@link #initializer(SingleAddressInitializer)} may be used for some customization of the execution strategy for a + * specific single address client instance, but may not reduce the offloading to be performed. Specifically, the + * initializer may introduce additional offloading via + * {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)} and may add filters which + * influence the computed execution strategy. + * + *

Specifying an execution strategy will affect the offloading used during the execution of client requests: + * + *

+ *
Unspecified or {@link HttpExecutionStrategies#defaultStrategy()} + *
The resulting client instances will use the default safe strategy for each API variant and + * {@link SingleAddressHttpClientBuilder} instances generated will also have default strategy. + * + *
{@link HttpExecutionStrategies#offloadNone()} + * (or deprecated {@link HttpExecutionStrategies#offloadNever()}) + *
{@link SingleAddressHttpClientBuilder} instances created by the client will have a strategy of + * {@link HttpExecutionStrategies#offloadNone()}. {@link HttpExecutionStrategies#offloadNone()} execution + * strategy requires that filters and asynchronous callbacks + * must not ever block during the execution of client + * requests. An {@link #initializer(SingleAddressInitializer) initializer} may override to add offloads using + * {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}. + * + *
A custom execution strategy ({@link HttpExecutionStrategies#customStrategyBuilder()}) or + * {@link HttpExecutionStrategies#offloadAll()} + *
{@link SingleAddressHttpClientBuilder} instances created by the client will start with the provided + * strategy and may add additional offloading as required by added filters. + *
+ * + * @param strategy {@inheritDoc} + * @return {@inheritDoc} + */ @Override MultiAddressHttpClientBuilder executionStrategy(HttpExecutionStrategy strategy); @@ -90,7 +126,10 @@ default MultiAddressHttpClientBuilder headersFactory(HttpHeadersFactory he /** * Set a function which can customize options for each {@link StreamingHttpClient} that is built. * @param initializer Initializes the {@link SingleAddressHttpClientBuilder} used to build new - * {@link StreamingHttpClient}s. + * {@link StreamingHttpClient}s. See {@link #executionStrategy(HttpExecutionStrategy)} for discussion of + * restrictions on the use of {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)} + * within an initializer. + * * @return {@code this} */ MultiAddressHttpClientBuilder initializer(SingleAddressInitializer initializer); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/SingleAddressHttpClientBuilder.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/SingleAddressHttpClientBuilder.java index b7cbcd3e85..e9ce2cd7dc 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/SingleAddressHttpClientBuilder.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/SingleAddressHttpClientBuilder.java @@ -172,6 +172,42 @@ SingleAddressHttpClientBuilder appendConnectionFilter( @Override SingleAddressHttpClientBuilder executor(Executor executor); + /** + * {@inheritDoc} + * + *

Specifying an execution strategy affects the offloading used during execution of client requests: + * + *

+ *
Unspecified or {@link HttpExecutionStrategies#defaultStrategy()} + *
Execution of client requests will use a safe (non-blocking) execution strategy appropriate for the + * client API used and the filters added. Blocking is always safe as all potentially blocking paths are + * offloaded. Each client API variant (async/blocking streaming/aggregate) requires a specific execution + * strategy to avoid blocking the event-loop and filters added via + * {@link #appendClientFilter(StreamingHttpClientFilterFactory)}, + * {@link #appendConnectionFilter(StreamingHttpConnectionFilterFactory)}, or + * {@link #appendConnectionFactoryFilter(ConnectionFactoryFilter)}, etc. may also require offloading. + * The execution strategy for execution of client requests will be computed based on the client API in use and + * {@link HttpExecutionStrategyInfluencer#requiredOffloads()} of added the filters. + * + *
{@link HttpExecutionStrategies#offloadNone()} + * (or deprecated {@link HttpExecutionStrategies#offloadNever()}) + *
No offloading will be used during execution of client requests regardless of the client API used or the + * influence of added filters. Filters and asynchronous callbacks + * must not ever block during the execution of client + * requests. + * + *
A custom execution strategy ({@link HttpExecutionStrategies#customStrategyBuilder()}) or + * {@link HttpExecutionStrategies#offloadAll()} + *
The specified execution strategy will be used for executing client requests rather than the client + * API's default safe strategy. Like with the default strategy, the actual execution strategy used is computed + * from the provided strategy and the execution strategies required by added filters. Filters and asynchronous + * callbacks MAY only block during the offloaded portions of + * the client request execution. + *
+ * + * @param strategy {@inheritDoc} + * @return {@inheritDoc} + */ @Override SingleAddressHttpClientBuilder executionStrategy(HttpExecutionStrategy strategy); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java index dce3340fa8..bdc4e24f7c 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToBlockingHttpClient.java @@ -31,8 +31,7 @@ final class StreamingHttpClientToBlockingHttpClient implements BlockingHttpClien private final HttpRequestResponseFactory reqRespFactory; StreamingHttpClientToBlockingHttpClient(final StreamingHttpClient client, final HttpExecutionStrategy strategy) { - this.strategy = defaultStrategy() == strategy ? - DEFAULT_BLOCKING_CONNECTION_STRATEGY : strategy; + this.strategy = defaultStrategy() == strategy ? DEFAULT_BLOCKING_CONNECTION_STRATEGY : strategy; this.client = client; context = new DelegatingHttpExecutionContext(client.executionContext()) { @Override diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java index cb32c3fcd8..790e0d019f 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java @@ -22,7 +22,7 @@ import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; import static io.servicetalk.http.api.RequestResponseFactories.toAggregated; -import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_CONNECTION_STRATEGY; +import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_ASYNC_CONNECTION_STRATEGY; import static java.util.Objects.requireNonNull; final class StreamingHttpClientToHttpClient implements HttpClient { @@ -32,7 +32,7 @@ final class StreamingHttpClientToHttpClient implements HttpClient { private final HttpRequestResponseFactory reqRespFactory; StreamingHttpClientToHttpClient(final StreamingHttpClient client, final HttpExecutionStrategy strategy) { - this.strategy = defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy; + this.strategy = defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy; this.client = client; context = new DelegatingHttpExecutionContext(client.executionContext()) { @Override @@ -118,7 +118,7 @@ static final class ReservedStreamingHttpConnectionToReservedHttpConnection imple ReservedStreamingHttpConnectionToReservedHttpConnection(final ReservedStreamingHttpConnection connection, final HttpExecutionStrategy strategy) { - this(connection, defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy, + this(connection, defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy, toAggregated(connection)); } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java index f0b70a43ed..4c80a8d035 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java @@ -28,7 +28,7 @@ final class StreamingHttpConnectionToHttpConnection implements HttpConnection { * For aggregation, we invoke the user callback (Single from client#request()) after the payload is completed, * hence we need to offload data. */ - static final HttpExecutionStrategy DEFAULT_CONNECTION_STRATEGY = OFFLOAD_RECEIVE_DATA_EVENT_STRATEGY; + static final HttpExecutionStrategy DEFAULT_ASYNC_CONNECTION_STRATEGY = OFFLOAD_RECEIVE_DATA_EVENT_STRATEGY; private final StreamingHttpConnection connection; private final HttpExecutionStrategy strategy; private final HttpConnectionContext context; @@ -37,7 +37,7 @@ final class StreamingHttpConnectionToHttpConnection implements HttpConnection { StreamingHttpConnectionToHttpConnection(final StreamingHttpConnection connection, final HttpExecutionStrategy strategy) { - this.strategy = defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy; + this.strategy = defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy; this.connection = connection; final HttpConnectionContext originalCtx = connection.connectionContext(); executionContext = new DelegatingHttpExecutionContext(connection.executionContext()) { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java index 9e2dec7488..606e7be654 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java @@ -24,11 +24,14 @@ import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.internal.SubscribableCompletable; +import io.servicetalk.context.api.ContextMap; import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory; import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection; import io.servicetalk.http.api.FilterableStreamingHttpClient; +import io.servicetalk.http.api.HttpContextKeys; import io.servicetalk.http.api.HttpExecutionContext; +import io.servicetalk.http.api.HttpExecutionStrategies; import io.servicetalk.http.api.HttpExecutionStrategy; import io.servicetalk.http.api.HttpHeadersFactory; import io.servicetalk.http.api.HttpRequestMetaData; @@ -37,8 +40,11 @@ import io.servicetalk.http.api.RedirectConfig; import io.servicetalk.http.api.SingleAddressHttpClientBuilder; import io.servicetalk.http.api.StreamingHttpClient; +import io.servicetalk.http.api.StreamingHttpClientFilter; +import io.servicetalk.http.api.StreamingHttpClientFilterFactory; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpRequestResponseFactory; +import io.servicetalk.http.api.StreamingHttpRequester; import io.servicetalk.http.api.StreamingHttpResponse; import io.servicetalk.http.api.StreamingHttpResponseFactory; import io.servicetalk.http.utils.RedirectingHttpRequesterFilter; @@ -63,6 +69,10 @@ import static io.servicetalk.concurrent.api.AsyncCloseables.toListenableAsyncCloseable; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource; +import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; +import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; +import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll; +import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone; import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext; import static java.util.Objects.requireNonNull; @@ -106,12 +116,12 @@ public StreamingHttpClient buildStreaming() { final CompositeCloseable closeables = newCompositeCloseable(); try { final HttpExecutionContext executionContext = executionContextBuilder.build(); - final ClientFactory clientFactory = new ClientFactory(builderFactory, executionContext, - singleAddressInitializer); + final ClientFactory clientFactory = + new ClientFactory(builderFactory, executionContext, singleAddressInitializer); final CachingKeyFactory keyFactory = closeables.prepend(new CachingKeyFactory()); final HttpHeadersFactory headersFactory = this.headersFactory; FilterableStreamingHttpClient urlClient = closeables.prepend( - new StreamingUrlHttpClient(executionContext, clientFactory, keyFactory, + new StreamingUrlHttpClient(executionContext, keyFactory, clientFactory, new DefaultStreamingHttpRequestResponseFactory(executionContext.bufferAllocator(), headersFactory != null ? headersFactory : DefaultHttpHeadersFactory.INSTANCE, HTTP_1_1))); @@ -240,6 +250,8 @@ public StreamingHttpClient apply(final UrlKey urlKey) { builder.sslConfig(DEFAULT_CLIENT_SSL_CONFIG); } + builder.appendClientFilter(HttpExecutionStrategyUpdater.INSTANCE); + if (singleAddressInitializer != null) { singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, builder); } @@ -248,6 +260,63 @@ public StreamingHttpClient apply(final UrlKey urlKey) { } } + private static void singleClientStrategyUpdate(ContextMap context, HttpExecutionStrategy singleStrategy) { + HttpExecutionStrategy requestStrategy = context.getOrDefault(HTTP_EXECUTION_STRATEGY_KEY, defaultStrategy()); + assert null != requestStrategy : "Request strategy unexpectedly null"; + HttpExecutionStrategy useStrategy = defaultStrategy() == requestStrategy ? + // For all apis except async streaming default conversion has already been done. + // This is the default to required strategy resolution for the async streaming client. + offloadAll() : + defaultStrategy() == singleStrategy || !singleStrategy.hasOffloads() ? + // single client is default or has no *additional* offloads + requestStrategy : + // add single client offloads to existing strategy + requestStrategy.merge(singleStrategy); + + if (useStrategy != requestStrategy) { + LOGGER.debug("Request strategy {} changes to {}. SingleAddressClient strategy: {}", + requestStrategy, useStrategy, singleStrategy); + context.put(HTTP_EXECUTION_STRATEGY_KEY, useStrategy); + } + } + + /** + * When request transitions from the multi-address level to the single-address level, this filter will make sure + * that any missing offloading required by the selected single-address client will be applied for the request + * execution. This filter never reduces offloading, it can only add missing offloading flags. Users who want to + * execute a request without offloading must specify {@link HttpExecutionStrategies#offloadNone()} strategy at the + * {@link MultiAddressHttpClientBuilder} or explicitly set the required strategy at request context with + * {@link HttpContextKeys#HTTP_EXECUTION_STRATEGY_KEY}. + */ + private static final class HttpExecutionStrategyUpdater implements StreamingHttpClientFilterFactory { + + static final StreamingHttpClientFilterFactory INSTANCE = new HttpExecutionStrategyUpdater(); + + private HttpExecutionStrategyUpdater() { + // Singleton + } + + @Override + public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) { + return new StreamingHttpClientFilter(client) { + @Override + protected Single request( + final StreamingHttpRequester delegate, final StreamingHttpRequest request) { + return defer(() -> { + singleClientStrategyUpdate(request.context(), client.executionContext().executionStrategy()); + + return delegate.request(request); + }); + } + }; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return offloadNone(); + } + } + private static final class StreamingUrlHttpClient implements FilterableStreamingHttpClient { private final HttpExecutionContext executionContext; private final StreamingHttpRequestResponseFactory reqRespFactory; @@ -256,8 +325,7 @@ private static final class StreamingUrlHttpClient implements FilterableStreaming private final ListenableAsyncCloseable closeable; StreamingUrlHttpClient(final HttpExecutionContext executionContext, - final Function clientFactory, - final CachingKeyFactory keyFactory, + final CachingKeyFactory keyFactory, final ClientFactory clientFactory, final StreamingHttpRequestResponseFactory reqRespFactory) { this.reqRespFactory = requireNonNull(reqRespFactory); this.group = ClientGroup.from(clientFactory); @@ -278,7 +346,10 @@ public Single reserveConnec final HttpRequestMetaData metaData) { return defer(() -> { try { - return selectClient(metaData).reserveConnection(metaData).shareContextOnSubscribe(); + FilterableStreamingHttpClient singleClient = selectClient(metaData); + singleClientStrategyUpdate(metaData.context(), singleClient.executionContext().executionStrategy()); + + return singleClient.reserveConnection(metaData).shareContextOnSubscribe(); } catch (Throwable t) { return Single.failed(t).shareContextOnSubscribe(); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ClientEffectiveStrategyTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ClientEffectiveStrategyTest.java index b9a4bae10e..9cb3135907 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ClientEffectiveStrategyTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ClientEffectiveStrategyTest.java @@ -95,7 +95,7 @@ import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.not; -@Execution(ExecutionMode.CONCURRENT) +@Execution(ExecutionMode.SAME_THREAD) class ClientEffectiveStrategyTest { @RegisterExtension @@ -121,7 +121,30 @@ private enum BuilderType { */ SINGLE_BUILDER, - /* MULTI_BUILDER strategies are currently broken, see https://github.com/apple/servicetalk/pull/2166 */ + /** + * Test execution strategy applied to multi client builder, + * {@link MultiAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}. + * Single client builder inherits multi client execution context strategy. + */ + MULTI_BUILDER, + + /** + * Multi client builder uses default strategy ({@link HttpExecutionStrategies#defaultStrategy()}). + * Single client builder inherits multi client builder execution context strategy, + * {@link HttpExecutionStrategies#defaultStrategy()}. + * Test execution strategy applied to single client builder, + * {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}. + */ + MULTI_DEFAULT_STRATEGY_SINGLE_BUILDER, + + /** + * Multi client builder uses {@link HttpExecutionStrategies#offloadNone()} strategy. + * Single client builder inherits multi client builder execution context strategy, + * {@link HttpExecutionStrategies#offloadNone()}. + * Test execution strategy applied to single client builder, + * {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}. + */ + MULTI_OFFLOAD_NONE_SINGLE_BUILDER } private static final HttpExecutionStrategy[] BUILDER_STRATEGIES = { @@ -183,6 +206,11 @@ static Stream casesSupplier() { List arguments = new ArrayList<>(); for (BuilderType builderType : BuilderType.values()) { for (HttpExecutionStrategy builderStrategy : BUILDER_STRATEGIES) { + if (BuilderType.MULTI_OFFLOAD_NONE_SINGLE_BUILDER == builderType && + null == builderStrategy) { + // null builderStrategy won't actually override, so skip. + continue; + } for (HttpExecutionStrategy filterStrategy : FILTER_STRATEGIES) { for (HttpExecutionStrategy lbStrategy : LB_STRATEGIES) { for (HttpExecutionStrategy cfStrategy : CF_STRATEGIES) { @@ -249,7 +277,7 @@ public HttpExecutionStrategy requiredOffloads() { }); } - if (null != builderStrategy) { + if (builderType != BuilderType.MULTI_BUILDER && null != builderStrategy) { clientBuilder.executionStrategy(builderStrategy); } }; @@ -266,6 +294,24 @@ public HttpExecutionStrategy requiredOffloads() { initializer.initialize(SCHEME, serverHostAndPort(context), singleClientBuilder); clientBuilder = singleClientBuilder::buildStreaming; break; + case MULTI_BUILDER: + case MULTI_DEFAULT_STRATEGY_SINGLE_BUILDER: + case MULTI_OFFLOAD_NONE_SINGLE_BUILDER: + requestTarget = SCHEME + "://" + serverHostAndPort(context) + PATH; + MultiAddressHttpClientBuilder multiClientBuilder = + HttpClients.forMultiAddressUrl() + .initializer(initializer) + .ioExecutor(CLIENT_CTX.ioExecutor()) + .executor(CLIENT_CTX.executor()); + if (BuilderType.MULTI_BUILDER == builderType && null != builderStrategy) { + multiClientBuilder.executionStrategy(builderStrategy); + } + if (BuilderType.MULTI_OFFLOAD_NONE_SINGLE_BUILDER == builderType && null != builderStrategy) { + // This is expected to ALWAYS be overridden in initializer. + multiClientBuilder.executionStrategy(offloadNone()); + } + clientBuilder = multiClientBuilder::buildStreaming; + break; default: throw new AssertionError("Unexpected clientType"); } @@ -338,6 +384,18 @@ private static HttpExecutionStrategy computeClientExecutionStrategy(final Builde switch (builderType) { case SINGLE_BUILDER: return defaultStrategy() == merged ? clientApi.strategy() : merged; + case MULTI_BUILDER: + return null == builder || defaultStrategy() == builder ? + clientApi.strategy().merge(merged) : + builder.hasOffloads() ? merged : builder; + case MULTI_DEFAULT_STRATEGY_SINGLE_BUILDER: + if (defaultStrategy() == merged || (null != builder && !builder.hasOffloads())) { + merged = offloadNone(); + } + return clientApi.strategy().merge(merged); + case MULTI_OFFLOAD_NONE_SINGLE_BUILDER: + return builder == null ? + offloadNone() : defaultStrategy() == merged ? offloadNone() : merged; default: throw new AssertionError("Unexpected builder type: " + builderType); } @@ -421,24 +479,26 @@ private static Buffer content(HttpExecutionContext ctx) { private static final class ClientInvokingThreadRecorder implements StreamingHttpClientFilterFactory { private Thread applicationThread = Thread.currentThread(); + private HttpExecutionStrategy expectedStrategy; private final EnumSet offloadPoints = EnumSet.noneOf(ClientOffloadPoint.class); private final ConcurrentMap invokingThreads = new ConcurrentHashMap<>(); private final Queue errors = new LinkedBlockingQueue<>(); - void reset(HttpExecutionStrategy streamingAsyncStrategy) { + void reset(HttpExecutionStrategy expectedStrategy) { invokingThreads.clear(); errors.clear(); offloadPoints.clear(); applicationThread = Thread.currentThread(); + this.expectedStrategy = expectedStrategy; // adjust expected offloads for specific execution strategy - if (streamingAsyncStrategy.isSendOffloaded()) { + if (expectedStrategy.isSendOffloaded()) { offloadPoints.add(Send); } - if (streamingAsyncStrategy.isMetadataReceiveOffloaded()) { + if (expectedStrategy.isMetadataReceiveOffloaded()) { offloadPoints.add(ReceiveMeta); } - if (streamingAsyncStrategy.isDataReceiveOffloaded()) { + if (expectedStrategy.isDataReceiveOffloaded()) { offloadPoints.add(ReceiveData); } } @@ -488,7 +548,8 @@ void recordThread(final ClientOffloadPoint offloadPoint, final HttpExecutionStra final AssertionError e = new AssertionError("Expected IoThread or " + applicationThread.getName() + " at " + offloadPoint + ", but was running on an offloading executor thread: " + current.getName() + - ". clientStrategy=" + clientStrategy + ", requestStrategy=" + requestStrategy); + ". clientStrategy=" + clientStrategy + ", expectedStrategy=" + expectedStrategy + + ", requestStrategy=" + requestStrategy); errors.add(e); } } @@ -498,8 +559,9 @@ void recordThread(final ClientOffloadPoint offloadPoint, final HttpExecutionStra } public void verifyOffloads(ClientApi clientApi, HttpExecutionStrategy clientStrategy, String apiStrategy) { - assertNoAsyncErrors("API=" + clientApi + ", clientStrategy=" + clientStrategy + ", apiStrategy=" + - apiStrategy + ". Async Errors! See suppressed", errors); + assertNoAsyncErrors("API=" + clientApi + ", apiStrategy=" + apiStrategy + + ", clientStrategy=" + clientStrategy + + ", expectedStrategy=" + expectedStrategy + ". Async Errors! See suppressed", errors); assertThat("Unexpected offload points recorded. " + invokingThreads, invokingThreads.size(), Matchers.is(ClientOffloadPoint.values().length)); }