From b5636d9647c6ec307e9b12e91026a938979b764d Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 19 Sep 2023 09:10:25 -0700 Subject: [PATCH] Use Publisher#replay operator Motivation: Publisher#replay was recently added, and there are use cases that can be simplified/improved by using this new operator. --- .../servicetalk/client/api/LoadBalancer.java | 3 + .../grpc/health/DefaultHealthService.java | 42 +++++---- .../AbstractLBHttpConnectionFactory.java | 4 - .../AbstractStreamingHttpConnection.java | 4 +- .../H2ClientParentConnectionContext.java | 5 +- .../netty/LoadBalancerReadySubscriber.java | 87 ------------------- .../netty/RetryingHttpRequesterFilter.java | 78 ++++++++--------- .../H2PriorKnowledgeFeatureParityTest.java | 16 +--- .../LoadBalancerReadySubscriberTest.java | 57 ------------ ...equesterFilterAutoRetryStrategiesTest.java | 5 +- .../loadbalancer/RoundRobinLoadBalancer.java | 5 +- 11 files changed, 82 insertions(+), 224 deletions(-) delete mode 100644 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java delete mode 100644 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java diff --git a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/LoadBalancer.java b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/LoadBalancer.java index 2cebfe4647..df297d9d67 100644 --- a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/LoadBalancer.java +++ b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/LoadBalancer.java @@ -95,6 +95,9 @@ default Single newConnection(@Nullable ContextMap context) { /** * A {@link Publisher} of events provided by this {@link LoadBalancer}. This maybe used to broadcast internal state * of this {@link LoadBalancer} to provide hints/visibility for external usage. + *

+ * Note the {@link Publisher} maybe subscribed to multiple times. Using operators such as + * {@link Publisher#replay(int)} is recommended. * @return A {@link Publisher} of events provided by this {@link LoadBalancer}. */ Publisher eventStream(); diff --git a/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java b/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java index b7608bc134..c1331941e3 100644 --- a/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java +++ b/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java @@ -75,7 +75,7 @@ public DefaultHealthService() { */ public DefaultHealthService(Predicate watchAllowed) { this.watchAllowed = requireNonNull(watchAllowed); - serviceToStatusMap.put(OVERALL_SERVICE_NAME, new HealthValue(SERVING)); + serviceToStatusMap.put(OVERALL_SERVICE_NAME, HealthValue.newInstance(SERVING)); } @Override @@ -85,7 +85,7 @@ public Single check(final GrpcServiceContext ctx, final Hea return Single.failed(new GrpcStatusException( new GrpcStatus(NOT_FOUND, "unknown service: " + request.getService()))); } - return Single.succeeded(health.last); + return health.publisher.takeAtMost(1).firstOrError(); } @Override @@ -103,13 +103,13 @@ public Publisher watch(final GrpcServiceContext ctx, final return Publisher.from(newBuilder().setStatus(NOT_SERVING).build()); } healthValue = serviceToStatusMap.computeIfAbsent(request.getService(), - __ -> new HealthValue(SERVICE_UNKNOWN)); + __ -> HealthValue.newInstance(SERVICE_UNKNOWN)); } finally { lock.unlock(); } } - return Publisher.from(healthValue.last).concat(healthValue.publisher); + return healthValue.publisher; } /** @@ -130,7 +130,7 @@ public boolean setStatus(String service, ServingStatus status) { return false; } resp = newBuilder().setStatus(status).build(); - healthValue = serviceToStatusMap.computeIfAbsent(service, __ -> new HealthValue(resp)); + healthValue = serviceToStatusMap.computeIfAbsent(service, __ -> new HealthValue()); } finally { lock.unlock(); } @@ -181,24 +181,29 @@ public boolean terminate() { private static final class HealthValue { private final Processor processor; private final Publisher publisher; - private volatile HealthCheckResponse last; - private HealthValue(final HealthCheckResponse initialState) { + HealthValue() { this.processor = newPublisherProcessorDropHeadOnOverflow(4); this.publisher = fromSource(processor) - // Allow multiple subscribers to Subscribe to the resulting Publisher. - .multicast(1, false); - this.last = initialState; + // Allow multiple subscribers to Subscribe to the resulting Publisher, use a history of 1 + // so each new subscriber gets the latest state. + .replay(1); + // Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest + // signal. + publisher.ignoreElements().subscribe(); } - private HealthValue(final ServingStatus status) { - this(newBuilder().setStatus(status).build()); + static HealthValue newInstance(final HealthCheckResponse initialState) { + HealthValue value = new HealthValue(); + value.next(initialState); + return value; + } + + static HealthValue newInstance(final ServingStatus status) { + return newInstance(newBuilder().setStatus(status).build()); } void next(HealthCheckResponse response) { - // Set the status here instead of in an operator because we need the status to be updated regardless if - // anyone is consuming the status. - last = response; processor.onNext(response); } @@ -208,7 +213,12 @@ void next(HealthCheckResponse response) { * @param status The last status to set. */ void completeMultipleTerminalSafe(ServingStatus status) { - next(newBuilder().setStatus(status).build()); + try { + next(newBuilder().setStatus(status).build()); + } catch (Throwable cause) { + processor.onError(cause); + return; + } processor.onComplete(); } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java index da9c05c61f..7b36b34ef0 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java @@ -34,9 +34,6 @@ import io.servicetalk.transport.api.TransportObserver; import io.servicetalk.transport.netty.internal.NoopTransportObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.function.Function; import javax.annotation.Nullable; @@ -49,7 +46,6 @@ abstract class AbstractLBHttpConnectionFactory implements ConnectionFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLBHttpConnectionFactory.class); @Nullable private final StreamingHttpConnectionFilterFactory connectionFilterFunction; final ReadOnlyHttpClientConfig config; diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java index 6c22f82953..1ac0bab2be 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java @@ -90,7 +90,9 @@ abstract class AbstractStreamingHttpConnection(maxPipelinedRequests)) .concat(connection.onClosing()) .concat(succeeded(ZERO_MAX_CONCURRENCY_EVENT)) - .multicast(1); // Allows multiple Subscribers to consume the event stream. + .replay(1); // Allow multiple Subscribers to consume, new Subscribers get last signal. + // Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal. + maxConcurrencySetting.ignoreElements().subscribe(); this.headersFactory = headersFactory; this.allowDropTrailersReadFromTransport = allowDropTrailersReadFromTransport; } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java index 44703eab4b..cfae51840d 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java @@ -192,7 +192,10 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par maxConcurrencyProcessor.onNext(DEFAULT_H2_MAX_CONCURRENCY_EVENT); bs = new Http2StreamChannelBootstrap(connection.channel()); maxConcurrencyPublisher = fromSource(maxConcurrencyProcessor) - .multicast(1); // Allows multiple Subscribers to consume the event stream. + .replay(1); // Allow multiple Subscribers to consume, new Subscribers get last signal. + // Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest + // signal. + maxConcurrencyPublisher.ignoreElements().subscribe(); } @Override diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java deleted file mode 100644 index 7f61495374..0000000000 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright © 2018-2019, 2022 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.http.netty; - -import io.servicetalk.client.api.LoadBalancerReadyEvent; -import io.servicetalk.concurrent.CompletableSource.Processor; -import io.servicetalk.concurrent.PublisherSource.Subscriber; -import io.servicetalk.concurrent.PublisherSource.Subscription; -import io.servicetalk.concurrent.api.Completable; -import io.servicetalk.concurrent.internal.DelayedCancellable; - -import javax.annotation.Nullable; - -import static io.servicetalk.concurrent.api.Completable.completed; -import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor; -import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; - -/** - * Designed to listen for {@link LoadBalancerReadyEvent}s and provide notification when a {@link LoadBalancerReadyEvent} - * returns {@code true} from {@link LoadBalancerReadyEvent#isReady()}. - */ -final class LoadBalancerReadySubscriber extends DelayedCancellable implements Subscriber { - @Nullable - private volatile Processor onHostsAvailable = newCompletableProcessor(); - - /** - * Get {@link Completable} that will complete when a {@link LoadBalancerReadyEvent} returns {@code true} - * from {@link LoadBalancerReadyEvent#isReady()}. - * @return A {@link Completable} that will complete when a {@link LoadBalancerReadyEvent} returns {@code true} - * from {@link LoadBalancerReadyEvent#isReady()}. - */ - public Completable onHostsAvailable() { - Processor onHostsAvailable = this.onHostsAvailable; - return onHostsAvailable == null ? completed() : fromSource(onHostsAvailable); - } - - @Override - public void onSubscribe(final Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(final Object o) { - if (o instanceof LoadBalancerReadyEvent) { - LoadBalancerReadyEvent event = (LoadBalancerReadyEvent) o; - if (event.isReady()) { - Processor onHostsAvailable = this.onHostsAvailable; - if (onHostsAvailable != null) { - this.onHostsAvailable = null; - onHostsAvailable.onComplete(); - } - } else if (onHostsAvailable == null) { - onHostsAvailable = newCompletableProcessor(); - } - } - } - - @Override - public void onError(final Throwable t) { - Processor onHostsAvailable = this.onHostsAvailable; - if (onHostsAvailable != null) { - onHostsAvailable.onError(t); - } - } - - @Override - public void onComplete() { - Processor onHostsAvailable = this.onHostsAvailable; - if (onHostsAvailable != null) { - onHostsAvailable.onError(new IllegalStateException("Subscriber listening for " + - LoadBalancerReadyEvent.class.getSimpleName() + " events completed unexpectedly")); - } - } -} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index e20ac45ef1..2bc2302ec3 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -20,7 +20,8 @@ import io.servicetalk.client.api.LoadBalancerReadyEvent; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscoverer; -import io.servicetalk.concurrent.api.AsyncCloseable; +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.BiIntFunction; import io.servicetalk.concurrent.api.Completable; @@ -54,15 +55,12 @@ import java.util.function.UnaryOperator; import javax.annotation.Nullable; -import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable; -import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable; import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Completable.failed; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffDeltaJitter; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter; -import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.http.api.HeaderUtils.DEFAULT_HEADER_FILTER; import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; import static io.servicetalk.http.api.HttpHeaderNames.EXPECT; @@ -129,15 +127,10 @@ public HttpExecutionStrategy requiredOffloads() { } final class ContextAwareRetryingHttpClientFilter extends StreamingHttpClientFilter { - @Nullable private Completable sdStatus; - - @Nullable - private AsyncCloseable closeAsync; - @Nullable - private LoadBalancerReadySubscriber loadBalancerReadySubscriber; + private Publisher lbEventStream; /** * Create a new instance. @@ -150,21 +143,8 @@ private ContextAwareRetryingHttpClientFilter(final FilterableStreamingHttpClient void inject(@Nullable final Publisher lbEventStream, @Nullable final Completable sdStatus) { - assert lbEventStream != null; - assert sdStatus != null; - this.sdStatus = ignoreSdErrors ? null : sdStatus; - - if (waitForLb) { - loadBalancerReadySubscriber = new LoadBalancerReadySubscriber(); - closeAsync = toAsyncCloseable(__ -> { - loadBalancerReadySubscriber.cancel(); - return completed(); - }); - toSource(lbEventStream).subscribe(loadBalancerReadySubscriber); - } else { - loadBalancerReadySubscriber = null; - closeAsync = emptyAsyncCloseable(); - } + this.sdStatus = ignoreSdErrors ? null : requireNonNull(sdStatus); + this.lbEventStream = waitForLb ? requireNonNull(lbEventStream) : null; } private final class OuterRetryStrategy implements BiIntFunction { @@ -187,9 +167,37 @@ public Completable apply(final int count, final Throwable t) { return failed(t); } - if (loadBalancerReadySubscriber != null && t instanceof NoAvailableHostException) { + if (lbEventStream != null && t instanceof NoAvailableHostException) { ++lbNotReadyCount; - final Completable onHostsAvailable = loadBalancerReadySubscriber.onHostsAvailable(); + final Completable onHostsAvailable = lbEventStream + .liftSync(subscriber -> new Subscriber() { + @Override + public void onSubscribe(final Subscription subscription) { + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(@Nullable final Object o) { + subscriber.onNext(o); + } + + @Override + public void onError(final Throwable t1) { + subscriber.onError(t1); + } + + @Override + public void onComplete() { + subscriber.onError(new IllegalStateException("Subscriber listening for " + + LoadBalancerReadyEvent.class.getSimpleName() + + " completed unexpectedly")); + } + }) + .takeWhile(lbEvent -> + // Don't complete until we get a LoadBalancerReadyEvent that is ready. + !(lbEvent instanceof LoadBalancerReadyEvent) || + !((LoadBalancerReadyEvent) lbEvent).isReady()) + .ignoreElements(); return sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus); } @@ -264,22 +272,6 @@ protected Single request(final StreamingHttpRequester del // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). return single.retryWhen(retryStrategy(request, executionContext())); } - - @Override - public Completable closeAsync() { - if (closeAsync != null) { - closeAsync.closeAsync(); - } - return super.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - if (closeAsync != null) { - closeAsync.closeAsyncGracefully(); - } - return super.closeAsyncGracefully(); - } } /** diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java index 4d51e7d523..a41602bd61 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java @@ -31,7 +31,6 @@ import io.servicetalk.http.api.FilterableStreamingHttpConnection; import io.servicetalk.http.api.Http2Exception; import io.servicetalk.http.api.HttpCookiePair; -import io.servicetalk.http.api.HttpEventKey; import io.servicetalk.http.api.HttpExecutionStrategy; import io.servicetalk.http.api.HttpHeaders; import io.servicetalk.http.api.HttpHeadersFactory; @@ -1636,12 +1635,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception Processor requestPayload = newProcessor(); client.request(client.post("/0").payloadBody(fromSource(requestPayload))).toFuture().get(); + Iterator> maxItr = maxConcurrentPubQueue.take().toIterable().iterator(); + serverChannelLatch.await(); Channel serverParentChannel = serverParentChannelRef.get(); serverParentChannel.writeAndFlush(new DefaultHttp2SettingsFrame( new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).sync(); - Iterator> maxItr = maxConcurrentPubQueue.take().toIterable().iterator(); // Verify that the initial maxConcurrency value is the default number assertThat("No initial maxConcurrency value", maxItr.hasNext(), is(true)); ConsumableEvent next = maxItr.next(); @@ -2011,22 +2011,12 @@ public void onComplete() { } private static final class TestConnectionFilter extends StreamingHttpConnectionFilter { - private final Publisher> maxConcurrent; - TestConnectionFilter(final FilterableStreamingHttpConnection delegate, Queue connectionQueue, Queue>> maxConcurrentPubQueue) { super(delegate); - maxConcurrent = delegate.transportEventStream(MAX_CONCURRENCY_NO_OFFLOADING).multicast(2); connectionQueue.add(delegate); - maxConcurrentPubQueue.add(maxConcurrent); - } - - @SuppressWarnings("unchecked") - @Override - public Publisher transportEventStream(final HttpEventKey eventKey) { - return eventKey == MAX_CONCURRENCY_NO_OFFLOADING ? (Publisher) maxConcurrent : - super.transportEventStream(eventKey); + maxConcurrentPubQueue.add(delegate.transportEventStream(MAX_CONCURRENCY_NO_OFFLOADING)); } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java deleted file mode 100644 index d0987c37c3..0000000000 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright © 2022 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.http.netty; - -import io.servicetalk.concurrent.PublisherSource; -import io.servicetalk.concurrent.api.Processors; - -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.concurrent.ExecutionException; - -import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertThrows; - -final class LoadBalancerReadySubscriberTest { - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void terminalPersistsFailure(boolean onError) { - PublisherSource.Processor processor = Processors.newPublisherProcessor(); - LoadBalancerReadySubscriber subscriber = new LoadBalancerReadySubscriber(); - processor.subscribe(subscriber); - - if (onError) { - processor.onError(DELIBERATE_EXCEPTION); - for (int i = 0; i < 5; ++i) { - assertThat(assertThrows(ExecutionException.class, - () -> subscriber.onHostsAvailable().toFuture().get()).getCause(), - is(DELIBERATE_EXCEPTION)); - } - } else { - processor.onComplete(); - for (int i = 0; i < 5; ++i) { - assertThat(assertThrows(ExecutionException.class, - () -> subscriber.onHostsAvailable().toFuture().get()).getCause(), - instanceOf(IllegalStateException.class)); - } - } - } -} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java index 0d3ca84b8a..9fac8c35e3 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java @@ -21,6 +21,7 @@ import io.servicetalk.concurrent.Executor; import io.servicetalk.concurrent.api.BiIntFunction; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.TestCompletable; import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.test.internal.TestCompletableSubscriber; @@ -305,7 +306,9 @@ private ContextAwareRetryingHttpClientFilter newFilter(final RetryingHttpRequest when(client.executionContext()).then(__ -> executionContext); final ContextAwareRetryingHttpClientFilter f = (ContextAwareRetryingHttpClientFilter) filter.create(client); - f.inject(lbEvents, sdStatus); + Publisher replayLBEvents = lbEvents.replay(1); + replayLBEvents.ignoreElements().subscribe(); + f.inject(replayLBEvents, sdStatus); return f; } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index b6f1c1e9cd..4a1034dc43 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -172,7 +172,8 @@ final class RoundRobinLoadBalancer usedHosts = new ClosedList<>(emptyList())); }); + // Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal. + eventStream.ignoreElements().subscribe(); subscribeToEvents(false); }