From cf0a8b89447463c1689c22d8956340ccc9490c2d Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 28 Feb 2022 17:41:01 -0800 Subject: [PATCH] Minor Processor usage cleanup (#2115) Modifications: - DefaultSingleAddressHttpClientBuilder$SdStatusCompletable clarify why multiple processors are created on error. - LoadBalancerReadySubscriber to retain the Processor reference in a terminal state. Subsequent subscribers will therefore see the correct terminal state. --- ...DefaultSingleAddressHttpClientBuilder.java | 4 +- .../netty/LoadBalancerReadySubscriber.java | 23 ++++---- .../LoadBalancerReadyHttpClientTest.java | 24 +++++++- .../LoadBalancerReadySubscriberTest.java | 57 +++++++++++++++++++ ...equesterFilterAutoRetryStrategiesTest.java | 11 ++-- 5 files changed, 97 insertions(+), 22 deletions(-) create mode 100644 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java index 8c1980dcf7..4e37faafa6 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -737,6 +737,8 @@ protected void handleSubscribe(final Subscriber subscriber) { void nextError(final Throwable t) { seenError = true; + // This state is reused across multiple subscribes, and we reset the processor to deliver the latest error + // to new subscribers. final CompletableSource.Processor oldProcessor = processor; oldProcessor.onError(t); final CompletableSource.Processor newProcessor = newCompletableProcessor(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java index b9d814ab30..7f61495374 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancerReadySubscriber.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2019, 2022 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,8 +40,7 @@ final class LoadBalancerReadySubscriber extends DelayedCancellable implements Su * Get {@link Completable} that will complete when a {@link LoadBalancerReadyEvent} returns {@code true} * from {@link LoadBalancerReadyEvent#isReady()}. * @return A {@link Completable} that will complete when a {@link LoadBalancerReadyEvent} returns {@code true} - * from {@link LoadBalancerReadyEvent#isReady()}, or {@code null} if this event has already been seen and a - * a {@link LoadBalancerReadyEvent} that returns {@code true} has not been seend. + * from {@link LoadBalancerReadyEvent#isReady()}. */ public Completable onHostsAvailable() { Processor onHostsAvailable = this.onHostsAvailable; @@ -58,33 +57,31 @@ public void onNext(final Object o) { if (o instanceof LoadBalancerReadyEvent) { LoadBalancerReadyEvent event = (LoadBalancerReadyEvent) o; if (event.isReady()) { - Processor onHostsAvailable = LoadBalancerReadySubscriber.this.onHostsAvailable; + Processor onHostsAvailable = this.onHostsAvailable; if (onHostsAvailable != null) { - LoadBalancerReadySubscriber.this.onHostsAvailable = null; + this.onHostsAvailable = null; onHostsAvailable.onComplete(); } - } else if (LoadBalancerReadySubscriber.this.onHostsAvailable == null) { - LoadBalancerReadySubscriber.this.onHostsAvailable = newCompletableProcessor(); + } else if (onHostsAvailable == null) { + onHostsAvailable = newCompletableProcessor(); } } } @Override public void onError(final Throwable t) { - Processor onHostsAvailable = LoadBalancerReadySubscriber.this.onHostsAvailable; + Processor onHostsAvailable = this.onHostsAvailable; if (onHostsAvailable != null) { - LoadBalancerReadySubscriber.this.onHostsAvailable = null; onHostsAvailable.onError(t); } } @Override public void onComplete() { - Processor onHostsAvailable = LoadBalancerReadySubscriber.this.onHostsAvailable; + Processor onHostsAvailable = this.onHostsAvailable; if (onHostsAvailable != null) { - LoadBalancerReadySubscriber.this.onHostsAvailable = null; - // Let the load balancer or retry strategy fail any pending requests. - onHostsAvailable.onComplete(); + onHostsAvailable.onError(new IllegalStateException("Subscriber listening for " + + LoadBalancerReadyEvent.class.getSimpleName() + " events completed unexpectedly")); } } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadyHttpClientTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadyHttpClientTest.java index 873b8461d3..d52aa346b5 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadyHttpClientTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadyHttpClientTest.java @@ -59,6 +59,7 @@ import static io.servicetalk.http.api.HttpResponseStatus.OK; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -123,6 +124,11 @@ void initializedFailedAlsoFailsReserve() throws InterruptedException { verifyOnInitializedFailedFailsAction(filter -> filter.reserveConnection(filter.get("/noop"))); } + @Test + void lbCompleteFailedAlsoFailsReserve() throws InterruptedException { + verifyLbCompleteFailedFailsAction(filter -> filter.reserveConnection(filter.get("/noop"))); + } + @Test void serviceDiscovererAlsoFailsRequest() throws InterruptedException { verifyOnServiceDiscovererErrorFailsAction(filter -> filter.request(filter.get("/noop"))); @@ -143,8 +149,20 @@ private void verifyOnServiceDiscovererErrorFailsAction( verifyFailsAction(action, sdStatusCompletable::onError, UNKNOWN_HOST_EXCEPTION); } - private void verifyFailsAction(Function> action, - Consumer errorConsumer, Throwable error) throws InterruptedException { + private void verifyLbCompleteFailedFailsAction( + Function> action) throws InterruptedException { + assertThat(verifyFailsAction0(action, ignored -> loadBalancerPublisher.onComplete(), DELIBERATE_EXCEPTION), + instanceOf(IllegalStateException.class)); + } + + private void verifyFailsAction(Function> action, Consumer errorConsumer, + Throwable error) throws InterruptedException { + assertThat(verifyFailsAction0(action, errorConsumer, error), is(error)); + } + + private Throwable verifyFailsAction0(Function> action, + Consumer errorConsumer, Throwable error) + throws InterruptedException { StreamingHttpClient client = TestStreamingHttpClient.from(reqRespFactory, mockExecutionCtx, appendClientFilterFactory(newAutomaticRetryFilterFactory(loadBalancerPublisher, sdStatusCompletable), testHandler)); @@ -164,7 +182,7 @@ private void verifyFailsAction(Function> action, // When a failure occurs that should also fail the action! errorConsumer.accept(error); latch.await(); - assertThat(causeRef.get(), is(error)); + return causeRef.get(); } private void verifyActionIsDelayedUntilAfterInitialized(Function> action) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java new file mode 100644 index 0000000000..d0987c37c3 --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/LoadBalancerReadySubscriberTest.java @@ -0,0 +1,57 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.api.Processors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.concurrent.ExecutionException; + +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +final class LoadBalancerReadySubscriberTest { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void terminalPersistsFailure(boolean onError) { + PublisherSource.Processor processor = Processors.newPublisherProcessor(); + LoadBalancerReadySubscriber subscriber = new LoadBalancerReadySubscriber(); + processor.subscribe(subscriber); + + if (onError) { + processor.onError(DELIBERATE_EXCEPTION); + for (int i = 0; i < 5; ++i) { + assertThat(assertThrows(ExecutionException.class, + () -> subscriber.onHostsAvailable().toFuture().get()).getCause(), + is(DELIBERATE_EXCEPTION)); + } + } else { + processor.onComplete(); + for (int i = 0; i < 5; ++i) { + assertThat(assertThrows(ExecutionException.class, + () -> subscriber.onHostsAvailable().toFuture().get()).getCause(), + instanceOf(IllegalStateException.class)); + } + } + } +} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java index cab45bd1da..bdaba862c0 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterAutoRetryStrategiesTest.java @@ -46,10 +46,11 @@ import static io.servicetalk.http.netty.RetryingHttpRequesterFilter.disableAutoRetries; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.IsNull.nullValue; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -164,11 +165,11 @@ void defaultForNoAvailableHostMaxRetries() { TestCompletableSubscriber subscriber = new TestCompletableSubscriber(); toSource(retry).subscribe(subscriber); if (i < 5) { - subscriber.awaitOnComplete(); + assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class)); } else { - assertThrows(NoAvailableHostException.class, () -> { - throw subscriber.awaitOnError(); - }); + // ambWith operator could return either error back. + assertThat(subscriber.awaitOnError(), anyOf(instanceOf(NoAvailableHostException.class), + instanceOf(IllegalStateException.class))); } } }