Skip to content

Commit

Permalink
Enhance logging for execution strategy computation (#2224)
Browse files Browse the repository at this point in the history
Motivation:

Users should see when the resulting execution strategy is different from
what they configured at the builder and should be able to debug the
root cause.

Modifications:

- Log at `info` level when computed strategy adds more offloading to the
custom strategy specified at the builder;
- Add debug level logging for each computation step when changes are
detected;

Result:

Users see when strategy changes after analyzing filters and are able to
debug the cause.
  • Loading branch information
idelpivnitskiy authored May 27, 2022
1 parent 931ffe4 commit f41ce24
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019, 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,69 +60,63 @@ private ClientStrategyInfluencerChainBuilder(ClientStrategyInfluencerChainBuilde
}

void add(StreamingHttpClientFilterFactory clientFilter) {
add("filter", clientFilter, clientFilter.requiredOffloads());
add(StreamingHttpClientFilterFactory.class, clientFilter, clientFilter.requiredOffloads());
}

@SuppressWarnings({"rawtypes", "unchecked"})
void add(HttpLoadBalancerFactory<?> lb) {
add("load balancer", lb, lb.requiredOffloads());
add(HttpLoadBalancerFactory.class, (HttpLoadBalancerFactory) lb, lb.requiredOffloads());
}

private void add(String purpose, ExecutionStrategyInfluencer<?> influencer, HttpExecutionStrategy strategy) {
private <T extends ExecutionStrategyInfluencer<HttpExecutionStrategy>> void add(
final Class<T> clazz, final T influencer, HttpExecutionStrategy strategy) {
if (offloadNever() == strategy) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. " +
"offloadNone() should be used instead. " +
"Making automatic adjustment, update the {} to avoid this warning.",
influencer, purpose);
offloadNeverWarning(clazz, influencer);
strategy = offloadNone();
}
if (defaultStrategy() == strategy) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, update the {} to avoid this warning.",
influencer, purpose);
defaultStrategyWarning(clazz, influencer);
strategy = offloadAll();
}
clientChain = null != clientChain ? clientChain.merge(strategy) : strategy;
@Nullable
final HttpExecutionStrategy clientChain = this.clientChain;
this.clientChain = null != clientChain ? clientChain.merge(strategy) : strategy;
logIfChanges(clazz, influencer, clientChain, this.clientChain);
}

void add(ConnectionFactoryFilter<?, FilterableStreamingHttpConnection> connectionFactoryFilter) {
ExecutionStrategy filterOffloads = connectionFactoryFilter.requiredOffloads();
if (offloadNever() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. " +
"offloadNone() should be used instead. " +
"Making automatic adjustment, update the filter.",
connectionFactoryFilter);
offloadNeverWarning(ConnectionFactoryFilter.class, connectionFactoryFilter);
filterOffloads = offloadNone();
}
if (defaultStrategy() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, consider updating the filter.",
connectionFactoryFilter);
defaultStrategyWarning(ConnectionFactoryFilter.class, connectionFactoryFilter);
filterOffloads = offloadAll();
}
connFactoryChain = null != connFactoryChain ?
@Nullable
final ConnectAndHttpExecutionStrategy connFactoryChain = this.connFactoryChain;
this.connFactoryChain = null != connFactoryChain ?
connFactoryChain.merge(filterOffloads) : ConnectAndHttpExecutionStrategy.from(filterOffloads);
logIfChanges(ConnectionFactoryFilter.class, connectionFactoryFilter, connFactoryChain, this.connFactoryChain);
}

void add(StreamingHttpConnectionFilterFactory connectionFilter) {
HttpExecutionStrategy filterOffloads = connectionFilter.requiredOffloads();
if (offloadNever() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. " +
"offloadNone() should be used instead. " +
"Making automatic adjustment, consider updating the filter.",
connectionFilter);
offloadNeverWarning(StreamingHttpConnectionFilterFactory.class, connectionFilter);
filterOffloads = offloadNone();
}
if (defaultStrategy() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, consider updating the filter.",
connectionFilter);
defaultStrategyWarning(StreamingHttpConnectionFilterFactory.class, connectionFilter);
filterOffloads = offloadAll();
}
if (filterOffloads.hasOffloads()) {
connFilterChain = null != connFilterChain ? connFilterChain.merge(filterOffloads) : filterOffloads;
@Nullable
final HttpExecutionStrategy connFilterChain = this.connFilterChain;
this.connFilterChain = null != connFilterChain ? connFilterChain.merge(filterOffloads) : filterOffloads;
logIfChanges(StreamingHttpConnectionFilterFactory.class,
connectionFilter, connFilterChain, this.connFilterChain);
}
}

Expand Down Expand Up @@ -156,4 +150,26 @@ ExecutionStrategy buildForConnectionFactory() {
ClientStrategyInfluencerChainBuilder copy() {
return new ClientStrategyInfluencerChainBuilder(this);
}

private static <T extends ExecutionStrategyInfluencer<?>> void offloadNeverWarning(final Class<T> clazz,
final T influencer) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. offloadNone() should be used " +
"instead. Making automatic adjustment, update the {} to avoid this warning.",
influencer, clazz.getSimpleName());
}

private static <T extends ExecutionStrategyInfluencer<?>> void defaultStrategyWarning(final Class<T> clazz,
final T influencer) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, update the {} to avoid this warning.",
influencer, clazz.getSimpleName());
}

private static <T extends ExecutionStrategyInfluencer<?>> void logIfChanges(final Class<T> clazz,
final T influencer, @Nullable final ExecutionStrategy before, @Nullable final ExecutionStrategy after) {
if (before != after) {
LOGGER.debug("{} '{}' changes execution strategy from '{}' to '{}'", clazz, influencer, before, after);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpProtocolConfig;
Expand All @@ -40,7 +41,6 @@
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.logging.api.LogLevel;
import io.servicetalk.transport.api.ConnectionAcceptorFactory;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.ExecutionStrategyInfluencer;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.ServerSslConfig;
Expand Down Expand Up @@ -92,7 +92,7 @@ final class DefaultHttpServerBuilder implements HttpServerBuilder {

private static StreamingHttpServiceFilterFactory buildFactory(List<StreamingHttpServiceFilterFactory> filters) {
return filters.stream()
.reduce((prev, filter) -> strategy -> prev.create(filter.create(strategy)))
.reduce((prev, filter) -> service -> prev.create(filter.create(service)))
.orElse(StreamingHttpServiceFilter::new); // unfortunate that we need extra layer
}

Expand All @@ -105,11 +105,17 @@ private static StreamingHttpService buildService(Stream<StreamingHttpServiceFilt
}

private static HttpExecutionStrategy computeRequiredStrategy(List<StreamingHttpServiceFilterFactory> filters,
HttpExecutionStrategy defaultStrategy) {
return filters.stream()
.map(ExecutionStrategyInfluencer::requiredOffloads)
.map(HttpExecutionStrategy::from)
.reduce(defaultStrategy, HttpExecutionStrategy::merge);
HttpExecutionStrategy serviceStrategy) {
HttpExecutionStrategy current = serviceStrategy;
for (StreamingHttpServiceFilterFactory filter : filters) {
HttpExecutionStrategy next = current.merge(filter.requiredOffloads());
if (current != next) {
LOGGER.debug("{} '{}' changes execution strategy from '{}' to '{}'",
StreamingHttpServiceFilterFactory.class, filter, current, next);
current = next;
}
}
return current;
}

private static <T> T checkNonOffloading(String desc, HttpExecutionStrategy assumeStrategy, T obj) {
Expand All @@ -122,15 +128,6 @@ private static <T> T checkNonOffloading(String desc, HttpExecutionStrategy assum
return obj;
}

private static HttpExecutionStrategy requiredOffloads(Object anything, HttpExecutionStrategy defaultOffloads) {
if (anything instanceof ExecutionStrategyInfluencer) {
ExecutionStrategy requiredOffloads = ((ExecutionStrategyInfluencer<?>) anything).requiredOffloads();
return HttpExecutionStrategy.from(requiredOffloads);
} else {
return defaultOffloads;
}
}

@Override
public HttpServerBuilder drainRequestPayloadBody(final boolean enable) {
this.drainRequestPayloadBody = enable;
Expand Down Expand Up @@ -257,22 +254,24 @@ public HttpServerBuilder bufferAllocator(final BufferAllocator allocator) {

@Override
public Single<HttpServerContext> listen(final HttpService service) {
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(service)));
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(HttpService.class, service)));
}

@Override
public Single<HttpServerContext> listenStreaming(final StreamingHttpService service) {
return listenForService(service, computeServiceStrategy(service));
return listenForService(service, computeServiceStrategy(StreamingHttpService.class, service));
}

@Override
public Single<HttpServerContext> listenBlocking(final BlockingHttpService service) {
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(service)));
return listenForAdapter(toStreamingHttpService(service,
computeServiceStrategy(BlockingHttpService.class, service)));
}

@Override
public Single<HttpServerContext> listenBlockingStreaming(final BlockingStreamingHttpService service) {
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(service)));
return listenForAdapter(toStreamingHttpService(service,
computeServiceStrategy(BlockingStreamingHttpService.class, service)));
}

private HttpExecutionContext buildExecutionContext(final HttpExecutionStrategy strategy) {
Expand All @@ -298,12 +297,12 @@ private Single<HttpServerContext> listenForAdapter(HttpApiConversions.ServiceAda
* </dl>
*
* @param rawService {@link StreamingHttpService} to use for the server.
* @param strategy the {@link HttpExecutionStrategy} to use for the service.
* @param computedStrategy the computed {@link HttpExecutionStrategy} to use for the service.
* @return A {@link Single} that completes when the server is successfully started or terminates with an error if
* the server could not be started.
*/
private Single<HttpServerContext> listenForService(final StreamingHttpService rawService,
final HttpExecutionStrategy strategy) {
final HttpExecutionStrategy computedStrategy) {
InfluencerConnectionAcceptor connectionAcceptor = connectionAcceptorFactory == null ? null :
InfluencerConnectionAcceptor.withStrategy(connectionAcceptorFactory.create(ACCEPT_ALL),
connectionAcceptorFactory.requiredOffloads());
Expand All @@ -313,28 +312,44 @@ private Single<HttpServerContext> listenForService(final StreamingHttpService ra

if (noOffloadServiceFilters.isEmpty()) {
filteredService = serviceFilters.isEmpty() ? rawService : buildService(serviceFilters.stream(), rawService);
executionContext = buildExecutionContext(strategy);
executionContext = buildExecutionContext(computedStrategy);
} else {
Stream<StreamingHttpServiceFilterFactory> nonOffloadingFilters = noOffloadServiceFilters.stream();

if (strategy.isRequestResponseOffloaded()) {
executionContext = buildExecutionContext(REQRESP_OFFLOADS.missing(strategy));
if (computedStrategy.isRequestResponseOffloaded()) {
executionContext = buildExecutionContext(REQRESP_OFFLOADS.missing(computedStrategy));
BooleanSupplier shouldOffload = executionContext.ioExecutor().shouldOffloadSupplier();
// We are going to have to offload, even if just to the raw service
OffloadingFilter offloadingFilter =
new OffloadingFilter(strategy, buildFactory(serviceFilters), shouldOffload);
new OffloadingFilter(computedStrategy, buildFactory(serviceFilters), shouldOffload);
nonOffloadingFilters = Stream.concat(nonOffloadingFilters, Stream.of(offloadingFilter));
} else {
// All the filters can be appended.
nonOffloadingFilters = Stream.concat(nonOffloadingFilters, serviceFilters.stream());
executionContext = buildExecutionContext(strategy);
executionContext = buildExecutionContext(computedStrategy);
}
filteredService = buildService(nonOffloadingFilters, rawService);
}

final HttpExecutionStrategy builderStrategy = this.strategy;
return doBind(executionContext, connectionAcceptor, filteredService)
.afterOnSuccess(serverContext -> LOGGER.debug("Server for address {} uses strategy {}",
serverContext.listenAddress(), strategy));
.afterOnSuccess(serverContext -> {
if (builderStrategy != defaultStrategy() &&
builderStrategy.missing(computedStrategy) != offloadNone()) {
LOGGER.info("Server for address {} created with the builder strategy {} but resulting " +
"computed strategy is {}. One of the filters or a final service enforce " +
"additional offloading. To find out what filter or service is " +
"it, enable debug level logging for {}.", serverContext.listenAddress(),
builderStrategy, computedStrategy, DefaultHttpServerBuilder.class);
} else if (builderStrategy == computedStrategy) {
LOGGER.debug("Server for address {} created with the execution strategy {}.",
serverContext.listenAddress(), computedStrategy);
} else {
LOGGER.debug("Server for address {} created with the builder strategy {}, " +
"resulting computed strategy is {}.",
serverContext.listenAddress(), builderStrategy, computedStrategy);
}
});
}

private Single<HttpServerContext> doBind(final HttpExecutionContext executionContext,
Expand All @@ -357,11 +372,14 @@ private Single<HttpServerContext> doBind(final HttpExecutionContext executionCon
filteredService, drainRequestPayloadBody);
}

private HttpExecutionStrategy computeServiceStrategy(Object service) {
HttpExecutionStrategy serviceStrategy = requiredOffloads(service, defaultStrategy());
HttpExecutionStrategy filterStrategy = computeRequiredStrategy(serviceFilters, serviceStrategy);
return defaultStrategy() == strategy ? filterStrategy :
strategy.hasOffloads() ? strategy.merge(filterStrategy) : strategy;
private <T extends HttpExecutionStrategyInfluencer> HttpExecutionStrategy computeServiceStrategy(
final Class<T> clazz, final T service) {
final HttpExecutionStrategy serviceStrategy = service.requiredOffloads();
LOGGER.debug("{} '{}' requires {} strategy.", clazz.getSimpleName(), service, serviceStrategy);
final HttpExecutionStrategy builderStrategy = this.strategy;
final HttpExecutionStrategy computedStrategy = computeRequiredStrategy(serviceFilters, serviceStrategy);
return defaultStrategy() == builderStrategy ? computedStrategy :
builderStrategy.hasOffloads() ? builderStrategy.merge(computedStrategy) : builderStrategy;
}

private static StreamingHttpService applyInternalFilters(StreamingHttpService service,
Expand Down
Loading

0 comments on commit f41ce24

Please sign in to comment.