Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple multi-address and partitioned client builders from HttpClientBuildContext #2136

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.MultiAddressHttpClientBuilder;
import io.servicetalk.http.api.RedirectConfig;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.HttpClientBuildContext;
import io.servicetalk.http.utils.RedirectingHttpRequesterFilter;
import io.servicetalk.transport.api.ClientSslConfig;
import io.servicetalk.transport.api.ClientSslConfigBuilder;
Expand All @@ -64,6 +64,7 @@
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -85,7 +86,8 @@ final class DefaultMultiAddressUrlHttpClientBuilder

private static final String HTTPS_SCHEME = HTTPS.toString();

private final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate;
private final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>> builderFactory;
private final HttpExecutionContextBuilder executionContextBuilder = new HttpExecutionContextBuilder();

@Nullable
private HttpHeadersFactory headersFactory;
Expand All @@ -95,19 +97,17 @@ final class DefaultMultiAddressUrlHttpClientBuilder
private SingleAddressInitializer<HostAndPort, InetSocketAddress> singleAddressInitializer;

DefaultMultiAddressUrlHttpClientBuilder(
final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate) {
this.builderTemplate = requireNonNull(builderTemplate);
final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>> bFactory) {
this.builderFactory = requireNonNull(bFactory);
}

@Override
public StreamingHttpClient buildStreaming() {
final CompositeCloseable closeables = newCompositeCloseable();
try {
final HttpClientBuildContext<HostAndPort, InetSocketAddress> buildContext = builderTemplate.copyBuildCtx();

final ClientFactory clientFactory = new ClientFactory(buildContext.builder, singleAddressInitializer);

final HttpExecutionContext executionContext = buildContext.builder.executionContextBuilder.build();
final HttpExecutionContext executionContext = executionContextBuilder.build();
final ClientFactory clientFactory = new ClientFactory(builderFactory, executionContext,
singleAddressInitializer);
final CachingKeyFactory keyFactory = closeables.prepend(new CachingKeyFactory());
final HttpHeadersFactory headersFactory = this.headersFactory;
FilterableStreamingHttpClient urlClient = closeables.prepend(
Expand All @@ -120,13 +120,8 @@ public StreamingHttpClient buildStreaming() {
urlClient = redirectConfig == null ? urlClient :
new RedirectingHttpRequesterFilter(redirectConfig).create(urlClient);

HttpExecutionStrategy computedStrategy =
buildContext.builder.computeChainStrategy(executionContext.executionStrategy());

LOGGER.debug("Client created with base strategy {} → computed strategy {}",
executionContext.executionStrategy(), computedStrategy);

return new FilterableClientToClient(urlClient, computedStrategy);
LOGGER.debug("Multi-address client created with base strategy {}", executionContext.executionStrategy());
return new FilterableClientToClient(urlClient, executionContext.executionStrategy());
} catch (final Throwable t) {
closeables.closeAsync().subscribe();
throw t;
Expand Down Expand Up @@ -220,32 +215,36 @@ public int hashCode() {

private static final class ClientFactory implements Function<UrlKey, FilterableStreamingHttpClient> {
private static final ClientSslConfig DEFAULT_CLIENT_SSL_CONFIG = new ClientSslConfigBuilder().build();
private final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate;
private final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>>
builderFactory;
private final HttpExecutionContext executionContext;
@Nullable
private final SingleAddressInitializer<HostAndPort, InetSocketAddress> singleAddressInitializer;

ClientFactory(
final DefaultSingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builderTemplate,
ClientFactory(final Function<HostAndPort, SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress>>
builderFactory,
final HttpExecutionContext executionContext,
@Nullable final SingleAddressInitializer<HostAndPort, InetSocketAddress> singleAddressInitializer) {
this.builderTemplate = builderTemplate;
this.builderFactory = builderFactory;
this.executionContext = executionContext;
this.singleAddressInitializer = singleAddressInitializer;
}

@Override
public StreamingHttpClient apply(final UrlKey urlKey) {
// Copy existing builder to prevent changes at runtime when concurrently creating clients for new addresses
final HttpClientBuildContext<HostAndPort, InetSocketAddress> buildContext =
builderTemplate.copyBuildCtx(urlKey.hostAndPort);
final SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builder =
requireNonNull(builderFactory.apply(urlKey.hostAndPort));

setExecutionContext(builder, executionContext);
if (HTTPS_SCHEME.equalsIgnoreCase(urlKey.scheme)) {
buildContext.builder.sslConfig(DEFAULT_CLIENT_SSL_CONFIG);
builder.sslConfig(DEFAULT_CLIENT_SSL_CONFIG);
}

if (singleAddressInitializer != null) {
singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, buildContext.builder);
singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, builder);
}

return buildContext.build();
return builder.buildStreaming();
}
}

Expand Down Expand Up @@ -330,27 +329,27 @@ public StreamingHttpRequest newRequest(final HttpRequestMethod method, final Str

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> ioExecutor(final IoExecutor ioExecutor) {
builderTemplate.ioExecutor(ioExecutor);
executionContextBuilder.ioExecutor(ioExecutor);
return this;
}

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> executor(final Executor executor) {
builderTemplate.executor(executor);
executionContextBuilder.executor(executor);
return this;
}

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> bufferAllocator(
final BufferAllocator allocator) {
builderTemplate.bufferAllocator(allocator);
executionContextBuilder.bufferAllocator(allocator);
return this;
}

@Override
public MultiAddressHttpClientBuilder<HostAndPort, InetSocketAddress> executionStrategy(
final HttpExecutionStrategy strategy) {
builderTemplate.executionStrategy(strategy);
executionContextBuilder.executionStrategy(strategy);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.PartitionHttpClientBuilderConfigurator;
import io.servicetalk.http.api.PartitionedHttpClientBuilder;
import io.servicetalk.http.api.ReservedStreamingHttpConnection;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.HttpClientBuildContext;
import io.servicetalk.transport.api.IoExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
Expand All @@ -66,75 +66,73 @@
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_INIT_DURATION;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_JITTER;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

final class DefaultPartitionedHttpClientBuilder<U, R> implements PartitionedHttpClientBuilder<U, R> {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPartitionedHttpClientBuilder.class);

private final U address;
private final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory;
private final Supplier<SingleAddressHttpClientBuilder<U, R>> builderFactory;
private final HttpExecutionContextBuilder executionContextBuilder = new HttpExecutionContextBuilder();
private ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> serviceDiscoverer;
@Nullable
private BiIntFunction<Throwable, ? extends Completable> serviceDiscovererRetryStrategy;
private final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory;
private final DefaultSingleAddressHttpClientBuilder<U, R> builderTemplate;
private int serviceDiscoveryMaxQueueSize = 32;
@Nullable
private HttpHeadersFactory headersFactory;
@Nullable
private SingleAddressInitializer<U, R> clientInitializer;
private PartitionHttpClientBuilderConfigurator<U, R> clientFilterFunction = (__, ___) -> { };
private PartitionMapFactory partitionMapFactory = PowerSetPartitionMapFactory.INSTANCE;
private int serviceDiscoveryMaxQueueSize = 32;

DefaultPartitionedHttpClientBuilder(
final DefaultSingleAddressHttpClientBuilder<U, R> builderTemplate,
final U address,
final Supplier<SingleAddressHttpClientBuilder<U, R>> builderFactory,
final ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> serviceDiscoverer,
final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory) {
this.builderTemplate = requireNonNull(builderTemplate);
this.address = requireNonNull(address);
this.builderFactory = requireNonNull(builderFactory);
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
this.partitionAttributesBuilderFactory = requireNonNull(partitionAttributesBuilderFactory);
}

@Override
public StreamingHttpClient buildStreaming() {
final HttpClientBuildContext<U, R> buildContext = builderTemplate.copyBuildCtx();
final HttpExecutionContext executionContext = buildContext.builder.build().executionContext();
final HttpExecutionContext executionContext = executionContextBuilder.build();
BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
if (sdRetryStrategy == null) {
sdRetryStrategy = retryWithConstantBackoffDeltaJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
SD_RETRY_STRATEGY_JITTER, executionContext.executor());
}
ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> psd =
final ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> psd =
new DefaultSingleAddressHttpClientBuilder.RetryingServiceDiscoverer<>(serviceDiscoverer,
sdRetryStrategy);

final PartitionedClientFactory<U, R, FilterableStreamingHttpClient> clientFactory = (pa, sd) -> {
// build new context, user may have changed anything on the builder from the filter
DefaultSingleAddressHttpClientBuilder<U, R> builder = buildContext.builder.copyBuildCtx().builder;
final SingleAddressHttpClientBuilder<U, R> builder = requireNonNull(builderFactory.get());
builder.serviceDiscoverer(sd);
clientFilterFunction.configureForPartition(pa, builder);
setExecutionContext(builder, executionContext);
if (clientInitializer != null) {
clientInitializer.initialize(pa, builder);
}
return builder.buildStreaming();
};

final Publisher<PartitionedServiceDiscovererEvent<R>> psdEvents = psd.discover(buildContext.address())
final Publisher<PartitionedServiceDiscovererEvent<R>> psdEvents = psd.discover(address)
.flatMapConcatIterable(identity());
final HttpHeadersFactory headersFactory = this.headersFactory;
DefaultPartitionedStreamingHttpClientFilter<U, R> partitionedClient =
final DefaultPartitionedStreamingHttpClientFilter<U, R> partitionedClient =
new DefaultPartitionedStreamingHttpClientFilter<>(psdEvents, serviceDiscoveryMaxQueueSize,
clientFactory, partitionAttributesBuilderFactory,
new DefaultStreamingHttpRequestResponseFactory(executionContext.bufferAllocator(),
headersFactory != null ? headersFactory : DefaultHttpHeadersFactory.INSTANCE, HTTP_1_1),
executionContext, partitionMapFactory);

HttpExecutionStrategy computedStrategy =
buildContext.builder.computeChainStrategy(executionContext.executionStrategy());

LOGGER.debug("Client created with base strategy {} → computed strategy {}",
executionContext.executionStrategy(), computedStrategy);

return new FilterableClientToClient(partitionedClient, computedStrategy);
LOGGER.debug("Partitioned client created with base strategy {}", executionContext.executionStrategy());
return new FilterableClientToClient(partitionedClient, executionContext.executionStrategy());
}

private static final class DefaultPartitionedStreamingHttpClientFilter<U, R> implements
Expand Down Expand Up @@ -263,19 +261,19 @@ public StreamingHttpRequest newRequest(final HttpRequestMethod method, final Str

@Override
public PartitionedHttpClientBuilder<U, R> executor(final Executor executor) {
builderTemplate.executor(executor);
executionContextBuilder.executor(executor);
return this;
}

@Override
public PartitionedHttpClientBuilder<U, R> ioExecutor(final IoExecutor ioExecutor) {
builderTemplate.ioExecutor(ioExecutor);
executionContextBuilder.ioExecutor(ioExecutor);
return this;
}

@Override
public PartitionedHttpClientBuilder<U, R> bufferAllocator(final BufferAllocator allocator) {
builderTemplate.bufferAllocator(allocator);
executionContextBuilder.bufferAllocator(allocator);
return this;
}

Expand Down Expand Up @@ -313,7 +311,7 @@ public PartitionedHttpClientBuilder<U, R> initializer(final SingleAddressInitial

@Override
public PartitionedHttpClientBuilder<U, R> executionStrategy(final HttpExecutionStrategy strategy) {
this.builderTemplate.executionStrategy(strategy);
this.executionContextBuilder.executionStrategy(strategy);
return this;
}

Expand Down
Loading