From d3e55abd2a75091ae955438b07ef0c82ca5e1c26 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 12 Jul 2024 10:42:35 -0700 Subject: [PATCH] Incorrect state possible after retrying `ServiceDiscoverer` events Motivation: Clients have a configurable `serviceDiscovererRetryStrategy` to guarantee a steady stream of events to the `LoadBalancer` that never fails. It's necessary at the client level to avoid hanging requests indefinitely and let requests observe failures from ServiceDiscoverer. Also, for `PartitionedHttpClient` it's necessary to guarantee that `GroupedPublisher` never fails. Retry is effectively a re-subscribe. According to `ServiceDiscoverer` contract (clarified in #3002), each `Subscriber` receives a "state of the world" as the first collection of events. The problem is that the state may change significantly between retries, as a result unavailable addresses can remain inside the `LoadBalancer` forever. Example: T1. SD delivers [a,b] T1. LB receives [a,b] T1. SD delivers error T2. SD info changed ("a" got revoked) T3. Client retries SD T3. SD delivers [b] T3. LB receives [b] (but still holds "a") When we retry `ServiceDiscoverer` errors, we should keep pushing deltas downstream or purge events that are not present in the new "state of the world". We previously had this protection but it was mistakenly removed in #1949 as part of a broader refactoring around `ServiceDiscoverer` <-> `LoadBalancer` contract. Modifications: - Add `RetryingServiceDiscoverer` that handles retries and keeps the state between retries. - Use it in `DefaultSingleAddressHttpClientBuilder` and `DefaultPartitionedHttpClientBuilder`. - Use `CastedServiceDiscoverer` to allow modifications for `ServiceDiscovererEvent` after we started to use a wildcard type in #2379. - Pass consistent `targetResource` identifier to both `RetryingServiceDiscoverer` and `LoadBalancerFactory` to allow state correlation when inspecting heap dump. Result: Client keeps pushing deltas to `LoadBalancer` after retrying `ServiceDiscoverer` errors, keeping its state consistent with `ServiceDiscoverer`. --- .../api/partition/PartitionAttributes.java | 2 +- .../DefaultPartitionedHttpClientBuilder.java | 40 +++-- ...DefaultSingleAddressHttpClientBuilder.java | 123 +++++++++------ .../http/netty/RetryingServiceDiscoverer.java | 149 ++++++++++++++++++ .../http/netty/PartitionedHttpClientTest.java | 1 + 5 files changed, 256 insertions(+), 59 deletions(-) create mode 100644 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java diff --git a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java index 3020794411..b1382d650c 100644 --- a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java +++ b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/partition/PartitionAttributes.java @@ -26,7 +26,7 @@ import static java.util.Objects.requireNonNull; /** - * Provide a way to describe a partition using a collection of of attributes. Typically only a single type of any + * Provide a way to describe a partition using a collection of attributes. Typically only a single type of any * particular {@link Key} exists in each {@link PartitionAttributes}. For example: *
  * { [Key(shard) = "shard X"], [Key(data center) = "data center X"], [Key(is main) = "false/true"] }
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java
index a964ef548b..faadce37b5 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java
@@ -55,17 +55,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
 import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
-import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
 import static io.servicetalk.concurrent.api.Single.defer;
 import static io.servicetalk.concurrent.api.Single.failed;
 import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
-import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_INIT_DURATION;
-import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.SD_RETRY_STRATEGY_MAX_DELAY;
 import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
 import static java.util.Objects.requireNonNull;
 import static java.util.function.Function.identity;
@@ -73,6 +72,7 @@
 @Deprecated // FIXME: 0.43 - remove deprecated class
 final class DefaultPartitionedHttpClientBuilder implements PartitionedHttpClientBuilder {
     private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPartitionedHttpClientBuilder.class);
+    private static final AtomicInteger CLIENT_ID = new AtomicInteger();
 
     private final U address;
     private final Function partitionAttributesBuilderFactory;
@@ -101,15 +101,11 @@ final class DefaultPartitionedHttpClientBuilder implements PartitionedHttp
 
     @Override
     public StreamingHttpClient buildStreaming() {
+        final String targetResource = targetResource(address);
         final HttpExecutionContext executionContext = executionContextBuilder.build();
-        BiIntFunction sdRetryStrategy = serviceDiscovererRetryStrategy;
-        if (sdRetryStrategy == null) {
-            sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
-                    SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
-        }
         final ServiceDiscoverer> psd =
-                new DefaultSingleAddressHttpClientBuilder.RetryingServiceDiscoverer<>(serviceDiscoverer,
-                        sdRetryStrategy);
+                new RetryingServiceDiscoverer<>(targetResource, serviceDiscoverer, serviceDiscovererRetryStrategy,
+                        executionContext, DefaultPartitionedHttpClientBuilder::makeUnavailable);
 
         final PartitionedClientFactory clientFactory = (pa, sd) -> {
             // build new context, user may have changed anything on the builder from the filter
@@ -139,6 +135,30 @@ public StreamingHttpClient buildStreaming() {
         return new FilterableClientToClient(partitionedClient, executionContext);
     }
 
+    private static  String targetResource(final U address) {
+        return address + "/" + CLIENT_ID.incrementAndGet();
+    }
+
+    private static  PartitionedServiceDiscovererEvent makeUnavailable(
+            final PartitionedServiceDiscovererEvent event) {
+        return new PartitionedServiceDiscovererEvent() {
+            @Override
+            public PartitionAttributes partitionAddress() {
+                return event.partitionAddress();
+            }
+
+            @Override
+            public R address() {
+                return event.address();
+            }
+
+            @Override
+            public Status status() {
+                return UNAVAILABLE;
+            }
+        };
+    }
+
     private static final class DefaultPartitionedStreamingHttpClientFilter implements
                                                                                  FilterableStreamingHttpClient {
 
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java
index ce5e3cb94b..27a0e1d60a 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java
@@ -19,6 +19,7 @@
 import io.servicetalk.buffer.api.CharSequences;
 import io.servicetalk.client.api.ConnectionFactory;
 import io.servicetalk.client.api.ConnectionFactoryFilter;
+import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
 import io.servicetalk.client.api.DelegatingServiceDiscoverer;
 import io.servicetalk.client.api.LoadBalancer;
 import io.servicetalk.client.api.ServiceDiscoverer;
@@ -66,17 +67,17 @@
 
 import java.net.InetSocketAddress;
 import java.net.SocketOption;
-import java.time.Duration;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
 
 import static io.netty.util.NetUtil.toSocketAddressString;
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
 import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
 import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
-import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
 import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
 import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
 import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
@@ -86,7 +87,6 @@
 import static io.servicetalk.http.netty.StrategyInfluencerAwareConversions.toConditionalConnectionFilterFactory;
 import static java.lang.Integer.parseInt;
 import static java.time.Duration.ofMinutes;
-import static java.time.Duration.ofSeconds;
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -105,9 +105,7 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress
             new RetryingHttpRequesterFilter.Builder().build();
     private static final StreamingHttpConnectionFilterFactory DEFAULT_IDLE_TIMEOUT_FILTER =
             new IdleTimeoutConnectionFilter(ofMinutes(5));
-
-    static final Duration SD_RETRY_STRATEGY_INIT_DURATION = ofSeconds(2);
-    static final Duration SD_RETRY_STRATEGY_MAX_DELAY = ofSeconds(128);
+    private static final AtomicInteger CLIENT_ID = new AtomicInteger();
 
     private final U address;
     @Nullable
@@ -116,7 +114,7 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress
     final HttpExecutionContextBuilder executionContextBuilder;
     private final ClientStrategyInfluencerChainBuilder strategyComputation;
     private HttpLoadBalancerFactory loadBalancerFactory;
-    private ServiceDiscoverer> serviceDiscoverer;
+    private ServiceDiscoverer> serviceDiscoverer;
     private Function hostToCharSequenceFunction =
             DefaultSingleAddressHttpClientBuilder::toAuthorityForm;
     private boolean addHostHeaderFallbackFilter = true;
@@ -142,8 +140,7 @@ final class DefaultSingleAddressHttpClientBuilder implements SingleAddress
         executionContextBuilder = new HttpExecutionContextBuilder();
         strategyComputation = new ClientStrategyInfluencerChainBuilder();
         this.loadBalancerFactory = defaultLoadBalancer();
-        this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
-
+        this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
         clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER);
     }
 
@@ -176,7 +173,7 @@ static  SingleAddressHttpClientBuilder setExecutionContext(
 
     private static final class HttpClientBuildContext {
         final DefaultSingleAddressHttpClientBuilder builder;
-        private final ServiceDiscoverer> sd;
+        private final ServiceDiscoverer> sd;
         private final SdStatusCompletable sdStatus;
 
         @Nullable
@@ -184,7 +181,7 @@ private static final class HttpClientBuildContext {
 
         HttpClientBuildContext(
                 final DefaultSingleAddressHttpClientBuilder builder,
-                final ServiceDiscoverer> sd,
+                final ServiceDiscoverer> sd,
                 @Nullable final BiIntFunction serviceDiscovererRetryStrategy) {
             this.builder = builder;
             this.serviceDiscovererRetryStrategy = serviceDiscovererRetryStrategy;
@@ -200,17 +197,18 @@ HttpClientConfig httpConfig() {
             return builder.config;
         }
 
-        ServiceDiscoverer> serviceDiscoverer(
-                HttpExecutionContext executionContext) {
-            BiIntFunction sdRetryStrategy = serviceDiscovererRetryStrategy;
+        ServiceDiscoverer> serviceDiscoverer(
+                final String targetResource, final HttpExecutionContext executionContext) {
+            final BiIntFunction sdRetryStrategy = serviceDiscovererRetryStrategy;
             if (sdRetryStrategy == HttpClients.NoRetriesStrategy.INSTANCE) {
                 return sd;
             }
-            if (sdRetryStrategy == null) {
-                sdRetryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
-                        SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
-            }
-            return new RetryingServiceDiscoverer<>(new StatusAwareServiceDiscoverer<>(sd, sdStatus), sdRetryStrategy);
+            return new RetryingServiceDiscoverer<>(targetResource, new StatusAwareServiceDiscoverer<>(sd, sdStatus),
+                    sdRetryStrategy, executionContext, HttpClientBuildContext::makeUnavailable);
+        }
+
+        private static  ServiceDiscovererEvent makeUnavailable(final ServiceDiscovererEvent event) {
+            return new DefaultServiceDiscovererEvent<>(event.address(), UNAVAILABLE);
         }
     }
 
@@ -220,6 +218,7 @@ public StreamingHttpClient buildStreaming() {
     }
 
     private static  StreamingHttpClient buildStreaming(final HttpClientBuildContext ctx) {
+        final String targetResource = targetResource(ctx);
         final ReadOnlyHttpClientConfig roConfig = ctx.httpConfig().asReadOnly();
         final HttpExecutionContext builderExecutionContext = ctx.builder.executionContextBuilder.build();
         final HttpExecutionStrategy computedStrategy =
@@ -236,7 +235,7 @@ public HttpExecutionStrategy executionStrategy() {
         final CompositeCloseable closeOnException = newCompositeCloseable();
         try {
             final Publisher>> sdEvents =
-                    ctx.serviceDiscoverer(executionContext).discover(ctx.address());
+                    ctx.serviceDiscoverer(targetResource, executionContext).discover(ctx.address());
 
             ConnectionFactoryFilter connectionFactoryFilter =
                     ctx.builder.connectionFactoryFilter;
@@ -304,9 +303,7 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
 
             final LoadBalancer lb =
                     closeOnException.prepend(ctx.builder.loadBalancerFactory.newLoadBalancer(
-                            sdEvents,
-                            connectionFactory,
-                            targetAddress(ctx)));
+                            sdEvents, connectionFactory, targetResource));
 
             ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory = ctx.builder.clientFilterFactory;
 
@@ -338,14 +335,14 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
                     builderStrategy.missing(computedStrategy) != offloadNone()) {
                 LOGGER.info("Client for {} created with the builder strategy {} but resulting computed strategy is " +
                                 "{}. One of the filters enforces additional offloading. To find out what filter is " +
-                                "it, enable debug level logging for {}.", targetAddress(ctx), builderStrategy,
+                                "it, enable debug level logging for {}.", targetResource, builderStrategy,
                         computedStrategy, ClientStrategyInfluencerChainBuilder.class);
             } else if (builderStrategy == computedStrategy) {
                 LOGGER.debug("Client for {} created with the execution strategy {}.",
-                        targetAddress(ctx), computedStrategy);
+                        targetResource, computedStrategy);
             } else {
                 LOGGER.debug("Client for {} created with the builder strategy {}, resulting computed strategy is {}.",
-                        targetAddress(ctx), builderStrategy, computedStrategy);
+                        targetResource, builderStrategy, computedStrategy);
             }
             return new FilterableClientToClient(wrappedClient, executionContext);
         } catch (final Throwable t) {
@@ -392,10 +389,14 @@ private static StreamingHttpRequestResponseFactory defaultReqRespFactory(ReadOnl
         }
     }
 
-    private static  String targetAddress(final HttpClientBuildContext ctx) {
-        assert ctx.builder.address != null;
-        return ctx.builder.proxyAddress == null ?
-                ctx.builder.address.toString() : ctx.builder.address + " (via " + ctx.builder.proxyAddress + ")";
+    /**
+     * This method is used to create a "targetResource" identifier that helps us to correlate internal state of the
+     * ServiceDiscoveryRetryStrategy and LoadBalancer.
+     */
+    private static  String targetResource(final HttpClientBuildContext ctx) {
+        final String uniqueAddress = ctx.builder.address + "/" + CLIENT_ID.incrementAndGet();
+        return ctx.builder.proxyAddress == null ? uniqueAddress :
+                uniqueAddress + " (via " + ctx.builder.proxyAddress + ")";
     }
 
     private static ContextAwareStreamingHttpClientFilterFactory appendFilter(
@@ -601,7 +602,7 @@ public DefaultSingleAddressHttpClientBuilder appendClientFilter(
     @Override
     public DefaultSingleAddressHttpClientBuilder serviceDiscoverer(
             final ServiceDiscoverer> serviceDiscoverer) {
-        this.serviceDiscoverer = requireNonNull(serviceDiscoverer);
+        this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
         return this;
     }
 
@@ -751,23 +752,6 @@ public Publisher> discover(final U u) {
         }
     }
 
-    static final class RetryingServiceDiscoverer>
-            extends DelegatingServiceDiscoverer {
-        private final BiIntFunction retryStrategy;
-
-        RetryingServiceDiscoverer(final ServiceDiscoverer delegate,
-                                  final BiIntFunction retryStrategy) {
-            super(delegate);
-            this.retryStrategy = requireNonNull(retryStrategy);
-        }
-
-        @Override
-        public Publisher> discover(final U u) {
-            // terminateOnNextException false -> LB is after this operator, if LB throws do best effort retry.
-            return delegate().discover(u).retryWhen(false, retryStrategy);
-        }
-    }
-
     private static final class AlpnReqRespFactoryFunc implements
                                                   Function {
         private final BufferAllocator allocator;
@@ -839,4 +823,47 @@ private static  HttpLoadBalancerFactory defaul
                 RoundRobinLoadBalancers.builder(
                                 DefaultHttpLoadBalancerFactory.class.getSimpleName()).build());
     }
+
+    // Because of the change in https://github.com/apple/servicetalk/pull/2379, we should constrain the type back to
+    // ServiceDiscovererEvent without "? extends" to allow RetryingServiceDiscoverer to mark events as UNAVAILABLE.
+    private static final class CastedServiceDiscoverer
+            implements ServiceDiscoverer> {
+
+        private final ServiceDiscoverer> delegate;
+
+        private CastedServiceDiscoverer(final ServiceDiscoverer> delegate) {
+            this.delegate = requireNonNull(delegate);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public Publisher>> discover(final U address) {
+            return delegate.discover(address).map(e -> (Collection>) e);
+        }
+
+        @Override
+        public Completable closeAsync() {
+            return delegate.closeAsync();
+        }
+
+        @Override
+        public Completable closeAsyncGracefully() {
+            return delegate.closeAsyncGracefully();
+        }
+
+        @Override
+        public Completable onClose() {
+            return delegate.onClose();
+        }
+
+        @Override
+        public Completable onClosing() {
+            return delegate.onClosing();
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+    }
 }
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
new file mode 100644
index 0000000000..eb6ea5d30f
--- /dev/null
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingServiceDiscoverer.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright © 2024 Apple Inc. and the ServiceTalk project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.servicetalk.http.netty;
+
+import io.servicetalk.client.api.DelegatingServiceDiscoverer;
+import io.servicetalk.client.api.ServiceDiscoverer;
+import io.servicetalk.client.api.ServiceDiscovererEvent;
+import io.servicetalk.concurrent.api.BiIntFunction;
+import io.servicetalk.concurrent.api.Completable;
+import io.servicetalk.concurrent.api.Publisher;
+import io.servicetalk.http.api.HttpExecutionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+import javax.annotation.Nullable;
+
+import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
+import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.emptyMap;
+
+final class RetryingServiceDiscoverer>
+        extends DelegatingServiceDiscoverer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RetryingServiceDiscoverer.class);
+
+    private static final Duration SD_RETRY_STRATEGY_INIT_DURATION = ofSeconds(2);
+    private static final Duration SD_RETRY_STRATEGY_MAX_DELAY = ofSeconds(128);
+
+    private final String targetResource;
+    private final BiIntFunction retryStrategy;
+    private final UnaryOperator makeUnavailable;
+
+    RetryingServiceDiscoverer(final String targetResource,
+                              final ServiceDiscoverer delegate,
+                              @Nullable BiIntFunction retryStrategy,
+                              final HttpExecutionContext executionContext,
+                              final UnaryOperator makeUnavailable) {
+        super(delegate);
+        this.targetResource = targetResource;
+        if (retryStrategy == null) {
+            retryStrategy = retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION,
+                    SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
+        }
+        this.retryStrategy = retryStrategy;
+        this.makeUnavailable = makeUnavailable;
+    }
+
+    @Override
+    public Publisher> discover(final U address) {
+        // The returned publisher is guaranteed to never fail because we retry all errors here. However, LoadBalancer
+        // can still cancel and re-subscribe in attempt to recover from unhealthy state. In this case, we need to
+        // re-initialize the ServiceDiscovererEventsCache and restart from an empty state.
+        return Publisher.defer(() -> {
+            final ServiceDiscovererEventsCache eventsCache =
+                    new ServiceDiscovererEventsCache<>(targetResource, makeUnavailable);
+            return delegate().discover(address)
+                    .map(eventsCache::consumeAndFilter)
+                    .beforeOnError(eventsCache::errorSeen)
+                    // terminateOnNextException false -> LB is after this operator, if LB throws do best effort retry
+                    .retryWhen(false, retryStrategy);
+        });
+    }
+
+    private static final class ServiceDiscovererEventsCache> {
+        @SuppressWarnings("rawtypes")
+        private static final Map NONE_RETAINED = emptyMap();
+
+        private final String targetResource;
+        private final UnaryOperator makeUnavailable;
+        private final Map currentState = new HashMap<>();
+        private Map retainedState = noneRetained();
+
+        private ServiceDiscovererEventsCache(final String targetResource, final UnaryOperator makeUnavailable) {
+            this.targetResource = targetResource;
+            this.makeUnavailable = makeUnavailable;
+        }
+
+        void errorSeen(final Throwable t) {
+            if (retainedState == NONE_RETAINED) {
+                retainedState = new HashMap<>(currentState);
+                currentState.clear();
+            }
+            LOGGER.debug("{} observed an error from ServiceDiscoverer", targetResource, t);
+        }
+
+        Collection consumeAndFilter(final Collection events) {
+            if (retainedState == NONE_RETAINED) {
+                for (E event : events) {
+                    if (UNAVAILABLE.equals(event.status())) {
+                        currentState.remove(event.address());
+                    } else {
+                        currentState.put(event.address(), event);
+                    }
+                }
+                return events;
+            }
+
+            // We have seen an error and re-subscribed upon retry. Based on the Publisher rule 1.10
+            // (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.10), each subscribe
+            // expects a different Subscriber. Therefore, discovery Publisher suppose to start from a fresh state. We
+            // should populate currentState with new addresses and deactivate the ones which are not present in the new
+            // collection, but were left in retainedState.
+            assert currentState.isEmpty();
+            final List toReturn = new ArrayList<>(events.size() + retainedState.size());
+            for (E event : events) {
+                final R address = event.address();
+                toReturn.add(event);
+                retainedState.remove(address);
+                if (!UNAVAILABLE.equals(event.status())) {
+                    currentState.put(address, event);
+                }
+            }
+
+            for (E event : retainedState.values()) {
+                assert event.status() != UNAVAILABLE;
+                toReturn.add(makeUnavailable.apply(event));
+            }
+
+            retainedState = noneRetained();
+            return toReturn;
+        }
+
+        @SuppressWarnings("unchecked")
+        private static > Map noneRetained() {
+            return NONE_RETAINED;
+        }
+    }
+}
diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java
index 674f1376ea..be8616722d 100644
--- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java
+++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/PartitionedHttpClientTest.java
@@ -67,6 +67,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("deprecation")
 class PartitionedHttpClientTest {
     private static final PartitionAttributes.Key SRV_NAME = PartitionAttributes.Key.newKey();
     private static final PartitionAttributes.Key SRV_LEADER = PartitionAttributes.Key.newKey();