Skip to content

Commit

Permalink
LoadBalancedStreamingHttpConnection doesn't need connect strategy (#…
Browse files Browse the repository at this point in the history
…2274)

Motivation:

The `connectStrategy` propagated to `AbstractLBHttpConnectionFactory`
should be used only for `newConnection` operation, but not for the
conversions inside `LoadBalancedStreamingHttpConnection`. That strategy
has to be taken from the `ExecutionContext` that takes into account
all filters that are applied to a reserved connection.

Modifications:

- `LoadBalancedStreamingHttpConnection` uses a strategy from execution
context instead of a `connectStrategy`;
- Validate `protocolBinding` in `AbstractLBHttpConnectionFactory` before
allocating `filterableConnectionFactory`;
- Skip the merge of `connectionFactoryStrategy` with
`ProxyConnectConnectionFactoryFilter` bcz we know the internal filters
never block or modify the strategy;

Result:

`LoadBalancedStreamingHttpConnection` uses the correct strategy for
conversions.
  • Loading branch information
idelpivnitskiy authored Jul 15, 2022
1 parent 7575fea commit a09e56b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.FilterableStreamingHttpLoadBalancedConnection;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
Expand All @@ -52,15 +50,11 @@ abstract class AbstractLBHttpConnectionFactory<ResolvedAddress>
implements ConnectionFactory<ResolvedAddress, LoadBalancedStreamingHttpConnection> {

@Nullable
final StreamingHttpConnectionFilterFactory connectionFilterFunction;
private final StreamingHttpConnectionFilterFactory connectionFilterFunction;
final ReadOnlyHttpClientConfig config;
final HttpExecutionContext executionContext;
final Function<HttpProtocolVersion, StreamingHttpRequestResponseFactory> reqRespFactoryFunc;
/**
* Computed execution strategy of the connection factory.
*/
final ExecutionStrategy connectStrategy;
final ConnectionFactory<ResolvedAddress, FilterableStreamingHttpConnection> filterableConnectionFactory;
private final ConnectionFactory<ResolvedAddress, FilterableStreamingHttpConnection> filterableConnectionFactory;
private final Function<FilterableStreamingHttpConnection,
FilterableStreamingHttpLoadBalancedConnection> protocolBinding;

Expand All @@ -76,7 +70,8 @@ abstract class AbstractLBHttpConnectionFactory<ResolvedAddress>
this.config = requireNonNull(config);
this.executionContext = requireNonNull(executionContext);
this.reqRespFactoryFunc = requireNonNull(reqRespFactoryFunc);
this.connectStrategy = connectStrategy;
requireNonNull(connectStrategy);
this.protocolBinding = requireNonNull(protocolBinding);
filterableConnectionFactory = connectionFactoryFilter.create(
// provide the supplier of connections.
new ConnectionFactory<ResolvedAddress, FilterableStreamingHttpConnection>() {
Expand Down Expand Up @@ -111,7 +106,6 @@ public Completable closeAsyncGracefully() {
return close.closeAsyncGracefully();
}
});
this.protocolBinding = protocolBinding;
}

@Override
Expand All @@ -136,8 +130,7 @@ public final Single<LoadBalancedStreamingHttpConnection> newConnection(
final ReservableRequestConcurrencyController concurrencyController =
newConcurrencyController(maxConcurrency, onClosing);
return new LoadBalancedStreamingHttpConnection(protocolBinding.apply(filteredConnection),
concurrencyController, connectStrategy instanceof HttpExecutionStrategy ?
(HttpExecutionStrategy) connectStrategy : HttpExecutionStrategies.offloadNone());
concurrencyController);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,16 @@ public HttpExecutionStrategy executionStrategy() {

ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> connectionFactoryFilter =
ctx.builder.connectionFactoryFilter;
ExecutionStrategy connectionFactoryStrategy =
final ExecutionStrategy connectionFactoryStrategy =
ctx.builder.strategyComputation.buildForConnectionFactory();

final SslContext sslContext = roConfig.tcpConfig().sslContext();
if (roConfig.hasProxy() && sslContext != null) {
assert roConfig.connectAddress() != null;
ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> proxy =
final ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> proxy =
new ProxyConnectConnectionFactoryFilter<>(roConfig.connectAddress());
assert !proxy.requiredOffloads().hasOffloads();
connectionFactoryFilter = proxy.append(connectionFactoryFilter);
connectionFactoryStrategy = connectionFactoryStrategy.merge(proxy.requiredOffloads());
}

final HttpExecutionStrategy builderStrategy = executionContext.executionStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpEventKey;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.ReservedBlockingHttpConnection;
import io.servicetalk.http.api.ReservedBlockingStreamingHttpConnection;
Expand All @@ -34,7 +33,6 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.transport.api.ExecutionStrategyInfluencer;

import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingConnection;
import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingStreamingConnection;
Expand All @@ -45,22 +43,14 @@
* Makes the wrapped {@link StreamingHttpConnection} aware of the {@link LoadBalancer}.
*/
final class LoadBalancedStreamingHttpConnection implements FilterableStreamingHttpLoadBalancedConnection,
ReservedStreamingHttpConnection, ReservableRequestConcurrencyController,
// Since we do not have filters for reserved connection, we rely on the original implementation to
// be an influencer hence we can try to correctly delegate when possible.
// Reserved connection given to the user will use the correct strategy and influencer chain since
// we wrap before returning to the user.
ExecutionStrategyInfluencer<HttpExecutionStrategy> {
ReservedStreamingHttpConnection, ReservableRequestConcurrencyController {
private final ReservableRequestConcurrencyController limiter;
private final FilterableStreamingHttpLoadBalancedConnection filteredConnection;
private final HttpExecutionStrategy connectStrategy;

LoadBalancedStreamingHttpConnection(FilterableStreamingHttpLoadBalancedConnection filteredConnection,
ReservableRequestConcurrencyController limiter,
HttpExecutionStrategy connectStrategy) {
ReservableRequestConcurrencyController limiter) {
this.filteredConnection = filteredConnection;
this.limiter = requireNonNull(limiter);
this.connectStrategy = connectStrategy;
}

@Override
Expand Down Expand Up @@ -128,24 +118,22 @@ public StreamingHttpRequest newRequest(final HttpRequestMethod method, final Str
return filteredConnection.newRequest(method, requestTarget);
}

// Client filters will also be applied for reserved connections that are returned to the user, see
// ClientFilterToReservedConnectionFilter. Therefore, we should take the finally computed strategy from the
// ExecutionContext that takes into account all filters.
@Override
public ReservedHttpConnection asConnection() {
return toReservedConnection(this, connectStrategy);
return toReservedConnection(this, executionContext().executionStrategy());
}

@Override
public ReservedBlockingStreamingHttpConnection asBlockingStreamingConnection() {
return toReservedBlockingStreamingConnection(this, connectStrategy);
return toReservedBlockingStreamingConnection(this, executionContext().executionStrategy());
}

@Override
public ReservedBlockingHttpConnection asBlockingConnection() {
return toReservedBlockingConnection(this, connectStrategy);
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return connectStrategy;
return toReservedBlockingConnection(this, executionContext().executionStrategy());
}

@Override
Expand Down

0 comments on commit a09e56b

Please sign in to comment.