Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance logging for execution strategy computation #2224

Merged
merged 6 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,69 +60,62 @@ private ClientStrategyInfluencerChainBuilder(ClientStrategyInfluencerChainBuilde
}

void add(StreamingHttpClientFilterFactory clientFilter) {
add("filter", clientFilter, clientFilter.requiredOffloads());
add(StreamingHttpClientFilterFactory.class, clientFilter, clientFilter.requiredOffloads());
}

void add(HttpLoadBalancerFactory<?> lb) {
add("load balancer", lb, lb.requiredOffloads());
add(HttpLoadBalancerFactory.class, (HttpLoadBalancerFactory) lb, lb.requiredOffloads());
}

private void add(String purpose, ExecutionStrategyInfluencer<?> influencer, HttpExecutionStrategy strategy) {
private <E extends ExecutionStrategyInfluencer<HttpExecutionStrategy>> void add(
final Class<E> clazz, final E influencer, HttpExecutionStrategy strategy) {
if (offloadNever() == strategy) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. " +
"offloadNone() should be used instead. " +
"Making automatic adjustment, update the {} to avoid this warning.",
influencer, purpose);
offloadNeverWarning(clazz, influencer);
strategy = offloadNone();
}
if (defaultStrategy() == strategy) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, update the {} to avoid this warning.",
influencer, purpose);
defaultStrategyWarning(clazz, influencer);
strategy = offloadAll();
}
clientChain = null != clientChain ? clientChain.merge(strategy) : strategy;
@Nullable
final HttpExecutionStrategy clientChain = this.clientChain;
this.clientChain = null != clientChain ? clientChain.merge(strategy) : strategy;
logIfChanges(clazz, influencer, clientChain, this.clientChain);
}

void add(ConnectionFactoryFilter<?, FilterableStreamingHttpConnection> connectionFactoryFilter) {
ExecutionStrategy filterOffloads = connectionFactoryFilter.requiredOffloads();
if (offloadNever() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. " +
"offloadNone() should be used instead. " +
"Making automatic adjustment, update the filter.",
connectionFactoryFilter);
offloadNeverWarning(ConnectionFactoryFilter.class, connectionFactoryFilter);
filterOffloads = offloadNone();
}
if (defaultStrategy() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, consider updating the filter.",
connectionFactoryFilter);
defaultStrategyWarning(ConnectionFactoryFilter.class, connectionFactoryFilter);
filterOffloads = offloadAll();
}
connFactoryChain = null != connFactoryChain ?
@Nullable
final ConnectAndHttpExecutionStrategy connFactoryChain = this.connFactoryChain;
this.connFactoryChain = null != connFactoryChain ?
connFactoryChain.merge(filterOffloads) : ConnectAndHttpExecutionStrategy.from(filterOffloads);
logIfChanges(ConnectionFactoryFilter.class, connectionFactoryFilter, connFactoryChain, this.connFactoryChain);
}

void add(StreamingHttpConnectionFilterFactory connectionFilter) {
HttpExecutionStrategy filterOffloads = connectionFilter.requiredOffloads();
if (offloadNever() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. " +
"offloadNone() should be used instead. " +
"Making automatic adjustment, consider updating the filter.",
connectionFilter);
offloadNeverWarning(StreamingHttpConnectionFilterFactory.class, connectionFilter);
filterOffloads = offloadNone();
}
if (defaultStrategy() == filterOffloads) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, consider updating the filter.",
connectionFilter);
defaultStrategyWarning(StreamingHttpConnectionFilterFactory.class, connectionFilter);
filterOffloads = offloadAll();
}
if (filterOffloads.hasOffloads()) {
connFilterChain = null != connFilterChain ? connFilterChain.merge(filterOffloads) : filterOffloads;
@Nullable
final HttpExecutionStrategy connFilterChain = this.connFilterChain;
this.connFilterChain = null != connFilterChain ? connFilterChain.merge(filterOffloads) : filterOffloads;
logIfChanges(StreamingHttpConnectionFilterFactory.class,
connectionFilter, connFilterChain, this.connFilterChain);
}
}

Expand Down Expand Up @@ -156,4 +149,26 @@ ExecutionStrategy buildForConnectionFactory() {
ClientStrategyInfluencerChainBuilder copy() {
return new ClientStrategyInfluencerChainBuilder(this);
}

private static <E extends ExecutionStrategyInfluencer<?>> void offloadNeverWarning(
final Class<E> clazz, final E influencer) {
LOGGER.warn("{}#requiredOffloads() returns offloadNever(), which is unexpected. offloadNone() should be used " +
"instead. Making automatic adjustment, update the {} to avoid this warning.",
influencer, clazz.getSimpleName());
}

private static <E extends ExecutionStrategyInfluencer<?>> void defaultStrategyWarning(
final Class<E> clazz, final E influencer) {
LOGGER.warn("{}#requiredOffloads() returns defaultStrategy(), which is unexpected. " +
"offloadAll() (safe default) or more appropriate custom strategy should be used instead." +
"Making automatic adjustment, update the {} to avoid this warning.",
influencer, clazz.getSimpleName());
}

private static <S extends ExecutionStrategy, E extends ExecutionStrategyInfluencer<S>> void logIfChanges(
final Class<E> clazz, final E influencer, final @Nullable S before, final @Nullable S after) {
if (before != after) {
LOGGER.debug("{} '{}' changes execution strategy from '{}' to '{}'", clazz, influencer, before, after);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpProtocolConfig;
Expand All @@ -40,7 +41,6 @@
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.logging.api.LogLevel;
import io.servicetalk.transport.api.ConnectionAcceptorFactory;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.ExecutionStrategyInfluencer;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.ServerSslConfig;
Expand Down Expand Up @@ -92,7 +92,7 @@ final class DefaultHttpServerBuilder implements HttpServerBuilder {

private static StreamingHttpServiceFilterFactory buildFactory(List<StreamingHttpServiceFilterFactory> filters) {
return filters.stream()
.reduce((prev, filter) -> strategy -> prev.create(filter.create(strategy)))
.reduce((prev, filter) -> service -> prev.create(filter.create(service)))
.orElse(StreamingHttpServiceFilter::new); // unfortunate that we need extra layer
}

Expand All @@ -105,11 +105,17 @@ private static StreamingHttpService buildService(Stream<StreamingHttpServiceFilt
}

private static HttpExecutionStrategy computeRequiredStrategy(List<StreamingHttpServiceFilterFactory> filters,
HttpExecutionStrategy defaultStrategy) {
return filters.stream()
.map(ExecutionStrategyInfluencer::requiredOffloads)
.map(HttpExecutionStrategy::from)
.reduce(defaultStrategy, HttpExecutionStrategy::merge);
HttpExecutionStrategy serviceStrategy) {
HttpExecutionStrategy current = serviceStrategy;
for (StreamingHttpServiceFilterFactory filter : filters) {
HttpExecutionStrategy next = current.merge(filter.requiredOffloads());
if (current != next) {
LOGGER.debug("{} '{}' changes execution strategy from '{}' to '{}'",
StreamingHttpServiceFilterFactory.class, filter, current, next);
current = next;
}
}
return current;
}

private static <T> T checkNonOffloading(String desc, HttpExecutionStrategy assumeStrategy, T obj) {
Expand All @@ -122,15 +128,6 @@ private static <T> T checkNonOffloading(String desc, HttpExecutionStrategy assum
return obj;
}

private static HttpExecutionStrategy requiredOffloads(Object anything, HttpExecutionStrategy defaultOffloads) {
if (anything instanceof ExecutionStrategyInfluencer) {
ExecutionStrategy requiredOffloads = ((ExecutionStrategyInfluencer<?>) anything).requiredOffloads();
return HttpExecutionStrategy.from(requiredOffloads);
} else {
return defaultOffloads;
}
}

@Override
public HttpServerBuilder drainRequestPayloadBody(final boolean enable) {
this.drainRequestPayloadBody = enable;
Expand Down Expand Up @@ -257,22 +254,24 @@ public HttpServerBuilder bufferAllocator(final BufferAllocator allocator) {

@Override
public Single<HttpServerContext> listen(final HttpService service) {
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(service)));
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(HttpService.class, service)));
}

@Override
public Single<HttpServerContext> listenStreaming(final StreamingHttpService service) {
return listenForService(service, computeServiceStrategy(service));
return listenForService(service, computeServiceStrategy(StreamingHttpService.class, service));
}

@Override
public Single<HttpServerContext> listenBlocking(final BlockingHttpService service) {
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(service)));
return listenForAdapter(toStreamingHttpService(service,
computeServiceStrategy(BlockingHttpService.class, service)));
}

@Override
public Single<HttpServerContext> listenBlockingStreaming(final BlockingStreamingHttpService service) {
return listenForAdapter(toStreamingHttpService(service, computeServiceStrategy(service)));
return listenForAdapter(toStreamingHttpService(service,
computeServiceStrategy(BlockingStreamingHttpService.class, service)));
}

private HttpExecutionContext buildExecutionContext(final HttpExecutionStrategy strategy) {
Expand All @@ -298,12 +297,12 @@ private Single<HttpServerContext> listenForAdapter(HttpApiConversions.ServiceAda
* </dl>
*
* @param rawService {@link StreamingHttpService} to use for the server.
* @param strategy the {@link HttpExecutionStrategy} to use for the service.
* @param computedStrategy the computed {@link HttpExecutionStrategy} to use for the service.
* @return A {@link Single} that completes when the server is successfully started or terminates with an error if
* the server could not be started.
*/
private Single<HttpServerContext> listenForService(final StreamingHttpService rawService,
final HttpExecutionStrategy strategy) {
final HttpExecutionStrategy computedStrategy) {
InfluencerConnectionAcceptor connectionAcceptor = connectionAcceptorFactory == null ? null :
InfluencerConnectionAcceptor.withStrategy(connectionAcceptorFactory.create(ACCEPT_ALL),
connectionAcceptorFactory.requiredOffloads());
Expand All @@ -313,28 +312,46 @@ private Single<HttpServerContext> listenForService(final StreamingHttpService ra

if (noOffloadServiceFilters.isEmpty()) {
filteredService = serviceFilters.isEmpty() ? rawService : buildService(serviceFilters.stream(), rawService);
executionContext = buildExecutionContext(strategy);
executionContext = buildExecutionContext(computedStrategy);
} else {
Stream<StreamingHttpServiceFilterFactory> nonOffloadingFilters = noOffloadServiceFilters.stream();

if (strategy.isRequestResponseOffloaded()) {
executionContext = buildExecutionContext(REQRESP_OFFLOADS.missing(strategy));
if (computedStrategy.isRequestResponseOffloaded()) {
executionContext = buildExecutionContext(REQRESP_OFFLOADS.missing(computedStrategy));
BooleanSupplier shouldOffload = executionContext.ioExecutor().shouldOffloadSupplier();
// We are going to have to offload, even if just to the raw service
OffloadingFilter offloadingFilter =
new OffloadingFilter(strategy, buildFactory(serviceFilters), shouldOffload);
new OffloadingFilter(computedStrategy, buildFactory(serviceFilters), shouldOffload);
nonOffloadingFilters = Stream.concat(nonOffloadingFilters, Stream.of(offloadingFilter));
} else {
// All the filters can be appended.
nonOffloadingFilters = Stream.concat(nonOffloadingFilters, serviceFilters.stream());
executionContext = buildExecutionContext(strategy);
executionContext = buildExecutionContext(computedStrategy);
}
filteredService = buildService(nonOffloadingFilters, rawService);
}

final HttpExecutionStrategy builderStrategy = this.strategy;
return doBind(executionContext, connectionAcceptor, filteredService)
.afterOnSuccess(serverContext -> LOGGER.debug("Server for address {} uses strategy {}",
serverContext.listenAddress(), strategy));
.afterOnSuccess(serverContext -> {
if (builderStrategy != defaultStrategy() &&
builderStrategy.missing(computedStrategy) != offloadNone()) {
LOGGER.info("Server for address {} created with the builder strategy {} but resulting " +
"computed strategy is {}. One of the filters or a final service enforce " +
"additional offloading. To find out what filter or service is " +
"it, enable debug level logging for {}.", serverContext.listenAddress(),
builderStrategy, computedStrategy, DefaultHttpServerBuilder.class);
} else if (builderStrategy == computedStrategy) {
LOGGER.debug("Server for address {} created with the execution strategy {}.",
serverContext.listenAddress(), computedStrategy);
} else {
LOGGER.debug("Server for address {} created with the builder strategy {}, " +
"resulting computed strategy is {}.",
serverContext.listenAddress(), builderStrategy, computedStrategy);
}
LOGGER.debug("Server for address {} uses strategy {}",
serverContext.listenAddress(), computedStrategy);
});
}

private Single<HttpServerContext> doBind(final HttpExecutionContext executionContext,
Expand All @@ -357,11 +374,14 @@ private Single<HttpServerContext> doBind(final HttpExecutionContext executionCon
filteredService, drainRequestPayloadBody);
}

private HttpExecutionStrategy computeServiceStrategy(Object service) {
HttpExecutionStrategy serviceStrategy = requiredOffloads(service, defaultStrategy());
HttpExecutionStrategy filterStrategy = computeRequiredStrategy(serviceFilters, serviceStrategy);
return defaultStrategy() == strategy ? filterStrategy :
strategy.hasOffloads() ? strategy.merge(filterStrategy) : strategy;
private HttpExecutionStrategy computeServiceStrategy(Class<?> clazz, HttpExecutionStrategyInfluencer service) {
final HttpExecutionStrategy serviceStrategy = service.requiredOffloads();
LOGGER.debug("{} {} requires {} strategy.", clazz.getSimpleName(), service, serviceStrategy);
final HttpExecutionStrategy builderStrategy = this.strategy;
final HttpExecutionStrategy computedStrategy =
computeRequiredStrategy(serviceFilters, serviceStrategy);
return defaultStrategy() == builderStrategy ? computedStrategy :
builderStrategy.hasOffloads() ? builderStrategy.merge(computedStrategy) : builderStrategy;
}
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved

private static StreamingHttpService applyInternalFilters(StreamingHttpService service,
Expand Down
Loading