Skip to content

Commit

Permalink
Incorrect state possible after retrying ServiceDiscoverer events
Browse files Browse the repository at this point in the history
Motivation:

Clients have a configurable `serviceDiscovererRetryStrategy` to
guarantee a steady stream of events to the `LoadBalancer` that never
fails. It's necessary at the client level to avoid hanging requests
indefinitely and let requests observe failures from ServiceDiscoverer.
Also, for `PartitionedHttpClient` it's necessary to guarantee that
`GroupedPublisher` never fails.

Retry is effectively a re-subscribe. According to `ServiceDiscoverer`
contract (clarified in apple#3002), each `Subscriber` receives a "state of
the world" as the first collection of events. The problem is that the
state may change significantly between retries, as a result unavailable
addresses can remain inside the `LoadBalancer` forever. Example:

T1. SD delivers [a,b]
T1. LB receives [a,b]
T1. SD delivers error
T2. SD info changed ("a" got revoked)
T3. Client retries SD
T3. SD delivers [b]
T3. LB receives [b] (but still holds "a")

When we retry `ServiceDiscoverer` errors, we should keep pushing deltas
downstream or purge events that are not present in the new "state of the
world".

We previously had this protection but it was mistakenly removed in apple#1949
as part of a broader refactoring around `ServiceDiscoverer` <->
`LoadBalancer` contract.

Modifications:

- Add `RetryingServiceDiscoverer` that handles retries and keeps the
state between retries.
- Use it in `DefaultSingleAddressHttpClientBuilder` and
`DefaultPartitionedHttpClientBuilder`.
- Use `CastedServiceDiscoverer` to allow modifications for
`ServiceDiscovererEvent` after we started to use a wildcard type in
apple#2379.
- Pass consistent `targetResource` identifier to both
`RetryingServiceDiscoverer` and `LoadBalancerFactory` to allow state
correlation when inspecting heap dump.

Result:

Client keeps pushing deltas to `LoadBalancer` after retrying
`ServiceDiscoverer` errors, keeping its state consistent with
`ServiceDiscoverer`.
  • Loading branch information
idelpivnitskiy committed Jul 12, 2024
1 parent 821619a commit d3e55ab
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static java.util.Objects.requireNonNull;

/**
* Provide a way to describe a partition using a collection of of attributes. Typically only a single type of any
* Provide a way to describe a partition using a collection of attributes. Typically only a single type of any
* particular {@link Key} exists in each {@link PartitionAttributes}. For example:
* <pre>
* { [Key(shard) = "shard X"], [Key(data center) = "data center X"], [Key(is main) = "false/true"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_INIT_DURATION;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_MAX_DELAY;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

@Deprecated // FIXME: 0.43 - remove deprecated class
final class DefaultPartitionedHttpClientBuilder<U, R> implements PartitionedHttpClientBuilder<U, R> {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPartitionedHttpClientBuilder.class);
private static final AtomicInteger CLIENT_ID = new AtomicInteger();

private final U address;
private final Function<HttpRequestMetaData, PartitionAttributesBuilder> partitionAttributesBuilderFactory;
Expand Down Expand Up @@ -101,15 +101,11 @@ final class DefaultPartitionedHttpClientBuilder<U, R> implements PartitionedHttp

@Override
public StreamingHttpClient buildStreaming() {
final String targetResource = targetResource(address);
final HttpExecutionContext executionContext = executionContextBuilder.build();
BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
if (sdRetryStrategy == null) {
sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
}
final ServiceDiscoverer<U, R, PartitionedServiceDiscovererEvent<R>> psd =
new DefaultSingleAddressHttpClientBuilder.RetryingServiceDiscoverer<>(serviceDiscoverer,
sdRetryStrategy);
new RetryingServiceDiscoverer<>(targetResource, serviceDiscoverer, serviceDiscovererRetryStrategy,
executionContext, DefaultPartitionedHttpClientBuilder::makeUnavailable);

final PartitionedClientFactory<U, R, FilterableStreamingHttpClient> clientFactory = (pa, sd) -> {
// build new context, user may have changed anything on the builder from the filter
Expand Down Expand Up @@ -139,6 +135,30 @@ public StreamingHttpClient buildStreaming() {
return new FilterableClientToClient(partitionedClient, executionContext);
}

private static <U> String targetResource(final U address) {
return address + "/" + CLIENT_ID.incrementAndGet();
}

private static <R> PartitionedServiceDiscovererEvent<R> makeUnavailable(
final PartitionedServiceDiscovererEvent<R> event) {
return new PartitionedServiceDiscovererEvent<R>() {
@Override
public PartitionAttributes partitionAddress() {
return event.partitionAddress();
}

@Override
public R address() {
return event.address();
}

@Override
public Status status() {
return UNAVAILABLE;
}
};
}

private static final class DefaultPartitionedStreamingHttpClientFilter<U, R> implements
FilterableStreamingHttpClient {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.buffer.api.CharSequences;
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.DelegatingServiceDiscoverer;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.ServiceDiscoverer;
Expand Down Expand Up @@ -66,17 +67,17 @@

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.netty.util.NetUtil.toSocketAddressString;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
Expand All @@ -86,7 +87,6 @@
import static io.servicetalk.http.netty.StrategyInfluencerAwareConversions.toConditionalConnectionFilterFactory;
import static java.lang.Integer.parseInt;
import static java.time.Duration.ofMinutes;
import static java.time.Duration.ofSeconds;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -105,9 +105,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
new RetryingHttpRequesterFilter.Builder().build();
private static final StreamingHttpConnectionFilterFactory DEFAULT_IDLE_TIMEOUT_FILTER =
new IdleTimeoutConnectionFilter(ofMinutes(5));

static final Duration SD_RETRY_STRATEGY_INIT_DURATION = ofSeconds(2);
static final Duration SD_RETRY_STRATEGY_MAX_DELAY = ofSeconds(128);
private static final AtomicInteger CLIENT_ID = new AtomicInteger();

private final U address;
@Nullable
Expand All @@ -116,7 +114,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
final HttpExecutionContextBuilder executionContextBuilder;
private final ClientStrategyInfluencerChainBuilder strategyComputation;
private HttpLoadBalancerFactory<R> loadBalancerFactory;
private ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> serviceDiscoverer;
private ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer;
private Function<U, CharSequence> hostToCharSequenceFunction =
DefaultSingleAddressHttpClientBuilder::toAuthorityForm;
private boolean addHostHeaderFallbackFilter = true;
Expand All @@ -142,8 +140,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
executionContextBuilder = new HttpExecutionContextBuilder();
strategyComputation = new ClientStrategyInfluencerChainBuilder();
this.loadBalancerFactory = defaultLoadBalancer();
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);

this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER);
}

Expand Down Expand Up @@ -176,15 +173,15 @@ static <U, R> SingleAddressHttpClientBuilder<U, R> setExecutionContext(

private static final class HttpClientBuildContext<U, R> {
final DefaultSingleAddressHttpClientBuilder<U, R> builder;
private final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> sd;
private final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> sd;
private final SdStatusCompletable sdStatus;

@Nullable
private final BiIntFunction<Throwable, ? extends Completable> serviceDiscovererRetryStrategy;

HttpClientBuildContext(
final DefaultSingleAddressHttpClientBuilder<U, R> builder,
final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> sd,
final ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> sd,
@Nullable final BiIntFunction<Throwable, ? extends Completable> serviceDiscovererRetryStrategy) {
this.builder = builder;
this.serviceDiscovererRetryStrategy = serviceDiscovererRetryStrategy;
Expand All @@ -200,17 +197,18 @@ HttpClientConfig httpConfig() {
return builder.config;
}

ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> serviceDiscoverer(
HttpExecutionContext executionContext) {
BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> serviceDiscoverer(
final String targetResource, final HttpExecutionContext executionContext) {
final BiIntFunction<Throwable, ? extends Completable> sdRetryStrategy = serviceDiscovererRetryStrategy;
if (sdRetryStrategy == HttpClients.NoRetriesStrategy.INSTANCE) {
return sd;
}
if (sdRetryStrategy == null) {
sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
}
return new RetryingServiceDiscoverer<>(new StatusAwareServiceDiscoverer<>(sd, sdStatus), sdRetryStrategy);
return new RetryingServiceDiscoverer<>(targetResource, new StatusAwareServiceDiscoverer<>(sd, sdStatus),
sdRetryStrategy, executionContext, HttpClientBuildContext::makeUnavailable);
}

private static <R> ServiceDiscovererEvent<R> makeUnavailable(final ServiceDiscovererEvent<R> event) {
return new DefaultServiceDiscovererEvent<>(event.address(), UNAVAILABLE);
}
}

Expand All @@ -220,6 +218,7 @@ public StreamingHttpClient buildStreaming() {
}

private static <U, R> StreamingHttpClient buildStreaming(final HttpClientBuildContext<U, R> ctx) {
final String targetResource = targetResource(ctx);
final ReadOnlyHttpClientConfig roConfig = ctx.httpConfig().asReadOnly();
final HttpExecutionContext builderExecutionContext = ctx.builder.executionContextBuilder.build();
final HttpExecutionStrategy computedStrategy =
Expand All @@ -236,7 +235,7 @@ public HttpExecutionStrategy executionStrategy() {
final CompositeCloseable closeOnException = newCompositeCloseable();
try {
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<R>>> sdEvents =
ctx.serviceDiscoverer(executionContext).discover(ctx.address());
ctx.serviceDiscoverer(targetResource, executionContext).discover(ctx.address());

ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> connectionFactoryFilter =
ctx.builder.connectionFactoryFilter;
Expand Down Expand Up @@ -304,9 +303,7 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(

final LoadBalancer<FilterableStreamingHttpLoadBalancedConnection> lb =
closeOnException.prepend(ctx.builder.loadBalancerFactory.newLoadBalancer(
sdEvents,
connectionFactory,
targetAddress(ctx)));
sdEvents, connectionFactory, targetResource));

ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory = ctx.builder.clientFilterFactory;

Expand Down Expand Up @@ -338,14 +335,14 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
builderStrategy.missing(computedStrategy) != offloadNone()) {
LOGGER.info("Client for {} created with the builder strategy {} but resulting computed strategy is " +
"{}. One of the filters enforces additional offloading. To find out what filter is " +
"it, enable debug level logging for {}.", targetAddress(ctx), builderStrategy,
"it, enable debug level logging for {}.", targetResource, builderStrategy,
computedStrategy, ClientStrategyInfluencerChainBuilder.class);
} else if (builderStrategy == computedStrategy) {
LOGGER.debug("Client for {} created with the execution strategy {}.",
targetAddress(ctx), computedStrategy);
targetResource, computedStrategy);
} else {
LOGGER.debug("Client for {} created with the builder strategy {}, resulting computed strategy is {}.",
targetAddress(ctx), builderStrategy, computedStrategy);
targetResource, builderStrategy, computedStrategy);
}
return new FilterableClientToClient(wrappedClient, executionContext);
} catch (final Throwable t) {
Expand Down Expand Up @@ -392,10 +389,14 @@ private static StreamingHttpRequestResponseFactory defaultReqRespFactory(ReadOnl
}
}

private static <U, R> String targetAddress(final HttpClientBuildContext<U, R> ctx) {
assert ctx.builder.address != null;
return ctx.builder.proxyAddress == null ?
ctx.builder.address.toString() : ctx.builder.address + " (via " + ctx.builder.proxyAddress + ")";
/**
* This method is used to create a "targetResource" identifier that helps us to correlate internal state of the
* ServiceDiscoveryRetryStrategy and LoadBalancer.
*/
private static <U, R> String targetResource(final HttpClientBuildContext<U, R> ctx) {
final String uniqueAddress = ctx.builder.address + "/" + CLIENT_ID.incrementAndGet();
return ctx.builder.proxyAddress == null ? uniqueAddress :
uniqueAddress + " (via " + ctx.builder.proxyAddress + ")";
}

private static ContextAwareStreamingHttpClientFilterFactory appendFilter(
Expand Down Expand Up @@ -601,7 +602,7 @@ public DefaultSingleAddressHttpClientBuilder<U, R> appendClientFilter(
@Override
public DefaultSingleAddressHttpClientBuilder<U, R> serviceDiscoverer(
final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> serviceDiscoverer) {
this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
return this;
}

Expand Down Expand Up @@ -751,23 +752,6 @@ public Publisher<Collection<E>> discover(final U u) {
}
}

static final class RetryingServiceDiscoverer<U, R, E extends ServiceDiscovererEvent<R>>
extends DelegatingServiceDiscoverer<U, R, E> {
private final BiIntFunction<Throwable, ? extends Completable> retryStrategy;

RetryingServiceDiscoverer(final ServiceDiscoverer<U, R, E> delegate,
final BiIntFunction<Throwable, ? extends Completable> retryStrategy) {
super(delegate);
this.retryStrategy = requireNonNull(retryStrategy);
}

@Override
public Publisher<Collection<E>> discover(final U u) {
// terminateOnNextException false -> LB is after this operator, if LB throws do best effort retry.
return delegate().discover(u).retryWhen(false, retryStrategy);
}
}

private static final class AlpnReqRespFactoryFunc implements
Function<HttpProtocolVersion, StreamingHttpRequestResponseFactory> {
private final BufferAllocator allocator;
Expand Down Expand Up @@ -839,4 +823,47 @@ private static <ResolvedAddress> HttpLoadBalancerFactory<ResolvedAddress> defaul
RoundRobinLoadBalancers.<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection>builder(
DefaultHttpLoadBalancerFactory.class.getSimpleName()).build());
}

// Because of the change in https://github.com/apple/servicetalk/pull/2379, we should constrain the type back to
// ServiceDiscovererEvent without "? extends" to allow RetryingServiceDiscoverer to mark events as UNAVAILABLE.
private static final class CastedServiceDiscoverer<U, R>
implements ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> {

private final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> delegate;

private CastedServiceDiscoverer(final ServiceDiscoverer<U, R, ? extends ServiceDiscovererEvent<R>> delegate) {
this.delegate = requireNonNull(delegate);
}

@Override
@SuppressWarnings("unchecked")
public Publisher<Collection<ServiceDiscovererEvent<R>>> discover(final U address) {
return delegate.discover(address).map(e -> (Collection<ServiceDiscovererEvent<R>>) e);
}

@Override
public Completable closeAsync() {
return delegate.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
return delegate.closeAsyncGracefully();
}

@Override
public Completable onClose() {
return delegate.onClose();
}

@Override
public Completable onClosing() {
return delegate.onClosing();
}

@Override
public String toString() {
return delegate.toString();
}
}
}
Loading

0 comments on commit d3e55ab

Please sign in to comment.