Skip to content

Commit

Permalink
Minor Processor usage cleanup (#2115)
Browse files Browse the repository at this point in the history
Modifications:
- DefaultSingleAddressHttpClientBuilder$SdStatusCompletable clarify
  why multiple processors are created on error.
- LoadBalancerReadySubscriber to retain the Processor reference
  in a terminal state. Subsequent subscribers will therefore see
  the correct terminal state.
  • Loading branch information
Scottmitch authored Mar 1, 2022
1 parent 49ee1c0 commit cf0a8b8
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -737,6 +737,8 @@ protected void handleSubscribe(final Subscriber subscriber) {

void nextError(final Throwable t) {
seenError = true;
// This state is reused across multiple subscribes, and we reset the processor to deliver the latest error
// to new subscribers.
final CompletableSource.Processor oldProcessor = processor;
oldProcessor.onError(t);
final CompletableSource.Processor newProcessor = newCompletableProcessor();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2019, 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,8 +40,7 @@ final class LoadBalancerReadySubscriber extends DelayedCancellable implements Su
* Get {@link Completable} that will complete when a {@link LoadBalancerReadyEvent} returns {@code true}
* from {@link LoadBalancerReadyEvent#isReady()}.
* @return A {@link Completable} that will complete when a {@link LoadBalancerReadyEvent} returns {@code true}
* from {@link LoadBalancerReadyEvent#isReady()}, or {@code null} if this event has already been seen and a
* a {@link LoadBalancerReadyEvent} that returns {@code true} has not been seend.
* from {@link LoadBalancerReadyEvent#isReady()}.
*/
public Completable onHostsAvailable() {
Processor onHostsAvailable = this.onHostsAvailable;
Expand All @@ -58,33 +57,31 @@ public void onNext(final Object o) {
if (o instanceof LoadBalancerReadyEvent) {
LoadBalancerReadyEvent event = (LoadBalancerReadyEvent) o;
if (event.isReady()) {
Processor onHostsAvailable = LoadBalancerReadySubscriber.this.onHostsAvailable;
Processor onHostsAvailable = this.onHostsAvailable;
if (onHostsAvailable != null) {
LoadBalancerReadySubscriber.this.onHostsAvailable = null;
this.onHostsAvailable = null;
onHostsAvailable.onComplete();
}
} else if (LoadBalancerReadySubscriber.this.onHostsAvailable == null) {
LoadBalancerReadySubscriber.this.onHostsAvailable = newCompletableProcessor();
} else if (onHostsAvailable == null) {
onHostsAvailable = newCompletableProcessor();
}
}
}

@Override
public void onError(final Throwable t) {
Processor onHostsAvailable = LoadBalancerReadySubscriber.this.onHostsAvailable;
Processor onHostsAvailable = this.onHostsAvailable;
if (onHostsAvailable != null) {
LoadBalancerReadySubscriber.this.onHostsAvailable = null;
onHostsAvailable.onError(t);
}
}

@Override
public void onComplete() {
Processor onHostsAvailable = LoadBalancerReadySubscriber.this.onHostsAvailable;
Processor onHostsAvailable = this.onHostsAvailable;
if (onHostsAvailable != null) {
LoadBalancerReadySubscriber.this.onHostsAvailable = null;
// Let the load balancer or retry strategy fail any pending requests.
onHostsAvailable.onComplete();
onHostsAvailable.onError(new IllegalStateException("Subscriber listening for " +
LoadBalancerReadyEvent.class.getSimpleName() + " events completed unexpectedly"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -123,6 +124,11 @@ void initializedFailedAlsoFailsReserve() throws InterruptedException {
verifyOnInitializedFailedFailsAction(filter -> filter.reserveConnection(filter.get("/noop")));
}

@Test
void lbCompleteFailedAlsoFailsReserve() throws InterruptedException {
verifyLbCompleteFailedFailsAction(filter -> filter.reserveConnection(filter.get("/noop")));
}

@Test
void serviceDiscovererAlsoFailsRequest() throws InterruptedException {
verifyOnServiceDiscovererErrorFailsAction(filter -> filter.request(filter.get("/noop")));
Expand All @@ -143,8 +149,20 @@ private void verifyOnServiceDiscovererErrorFailsAction(
verifyFailsAction(action, sdStatusCompletable::onError, UNKNOWN_HOST_EXCEPTION);
}

private void verifyFailsAction(Function<StreamingHttpClient, Single<?>> action,
Consumer<Throwable> errorConsumer, Throwable error) throws InterruptedException {
private void verifyLbCompleteFailedFailsAction(
Function<StreamingHttpClient, Single<?>> action) throws InterruptedException {
assertThat(verifyFailsAction0(action, ignored -> loadBalancerPublisher.onComplete(), DELIBERATE_EXCEPTION),
instanceOf(IllegalStateException.class));
}

private void verifyFailsAction(Function<StreamingHttpClient, Single<?>> action, Consumer<Throwable> errorConsumer,
Throwable error) throws InterruptedException {
assertThat(verifyFailsAction0(action, errorConsumer, error), is(error));
}

private Throwable verifyFailsAction0(Function<StreamingHttpClient, Single<?>> action,
Consumer<Throwable> errorConsumer, Throwable error)
throws InterruptedException {
StreamingHttpClient client = TestStreamingHttpClient.from(reqRespFactory, mockExecutionCtx,
appendClientFilterFactory(newAutomaticRetryFilterFactory(loadBalancerPublisher, sdStatusCompletable),
testHandler));
Expand All @@ -164,7 +182,7 @@ private void verifyFailsAction(Function<StreamingHttpClient, Single<?>> action,
// When a failure occurs that should also fail the action!
errorConsumer.accept(error);
latch.await();
assertThat(causeRef.get(), is(error));
return causeRef.get();
}

private void verifyActionIsDelayedUntilAfterInitialized(Function<StreamingHttpClient, Single<?>> action)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Processors;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.concurrent.ExecutionException;

import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

final class LoadBalancerReadySubscriberTest {

@ParameterizedTest
@ValueSource(booleans = {true, false})
void terminalPersistsFailure(boolean onError) {
PublisherSource.Processor<Object, Object> processor = Processors.newPublisherProcessor();
LoadBalancerReadySubscriber subscriber = new LoadBalancerReadySubscriber();
processor.subscribe(subscriber);

if (onError) {
processor.onError(DELIBERATE_EXCEPTION);
for (int i = 0; i < 5; ++i) {
assertThat(assertThrows(ExecutionException.class,
() -> subscriber.onHostsAvailable().toFuture().get()).getCause(),
is(DELIBERATE_EXCEPTION));
}
} else {
processor.onComplete();
for (int i = 0; i < 5; ++i) {
assertThat(assertThrows(ExecutionException.class,
() -> subscriber.onHostsAvailable().toFuture().get()).getCause(),
instanceOf(IllegalStateException.class));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@
import static io.servicetalk.http.netty.RetryingHttpRequesterFilter.disableAutoRetries;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -164,11 +165,11 @@ void defaultForNoAvailableHostMaxRetries() {
TestCompletableSubscriber subscriber = new TestCompletableSubscriber();
toSource(retry).subscribe(subscriber);
if (i < 5) {
subscriber.awaitOnComplete();
assertThat(subscriber.awaitOnError(), instanceOf(IllegalStateException.class));
} else {
assertThrows(NoAvailableHostException.class, () -> {
throw subscriber.awaitOnError();
});
// ambWith operator could return either error back.
assertThat(subscriber.awaitOnError(), anyOf(instanceOf(NoAvailableHostException.class),
instanceOf(IllegalStateException.class)));
}
}
}
Expand Down

0 comments on commit cf0a8b8

Please sign in to comment.