From cc8dbb95445e827f69a07253060c3814e193d4f0 Mon Sep 17 00:00:00 2001 From: OwenLindsell Date: Wed, 11 Dec 2019 15:45:58 +0000 Subject: [PATCH] Remove RxJava --- .../handlers/StandardHttpPipeline.java | 37 +++--- .../handlers/StandardHttpPipelineTest.java | 6 +- .../netty/connectors/HttpPipelineHandler.java | 51 ++++---- .../connectors/HttpPipelineHandlerTest.java | 111 +++++++++--------- 4 files changed, 104 insertions(+), 101 deletions(-) diff --git a/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StandardHttpPipeline.java b/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StandardHttpPipeline.java index 1549351f74..c8d7d069ae 100644 --- a/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StandardHttpPipeline.java +++ b/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StandardHttpPipeline.java @@ -15,23 +15,20 @@ */ package com.hotels.styx.routing.handlers; -import static java.util.Collections.emptyList; -import static java.util.Objects.requireNonNull; -import static rx.Observable.create; -import static rx.RxReactiveStreams.toObservable; -import static rx.RxReactiveStreams.toPublisher; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - import com.hotels.styx.api.Eventual; import com.hotels.styx.api.HttpHandler; import com.hotels.styx.api.HttpInterceptor; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; - import com.hotels.styx.server.track.RequestTracker; -import rx.Observable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyList; +import static java.util.Objects.requireNonNull; /** * The pipeline consists of a chain of interceptors followed by a handler. @@ -99,21 +96,27 @@ public Eventual proceed(LiveHttpRequest request) { requestTracker.markRequestAsSent(request); - return new Eventual<>(toPublisher(toObservable(client.handle(request, this.context)) - .compose(StandardHttpPipeline::sendErrorOnDoubleSubscription))); + return new Eventual<>(new SingleSubscriptionPublisher(client.handle(request, this.context))); } } - private static Observable sendErrorOnDoubleSubscription(Observable original) { - AtomicInteger subscriptionCounter = new AtomicInteger(); + private static final class SingleSubscriptionPublisher implements Publisher { + + private AtomicInteger subscriptionCounter = new AtomicInteger(); + private Publisher original; - return create(subscriber -> { + public SingleSubscriptionPublisher(Publisher original) { + this.original = original; + } + + @Override + public void subscribe(Subscriber subscriber) { if (subscriptionCounter.incrementAndGet() > 1) { subscriber.onError(new IllegalStateException("Response already subscribed. Additional subscriptions forbidden.")); } else { original.subscribe(subscriber); } - }); + } } } diff --git a/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java b/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java index fbf4868620..054620e9e3 100644 --- a/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java @@ -43,9 +43,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static rx.RxReactiveStreams.toObservable; public class StandardHttpPipelineTest { @Test @@ -167,7 +165,7 @@ public void sendsExceptionUponMultipleSubscription() { assertThat(response.status(), is(OK)); assertThrows(IllegalStateException.class, - () -> toObservable(responseObservable).toBlocking().first()); + () -> Mono.from(responseObservable).block()); } @ParameterizedTest @@ -180,7 +178,7 @@ public void sendsExceptionUponExtraSubscriptionInsideInterceptor(HttpInterceptor Eventual responseObservable = pipeline.handle(get("/").build(), HttpInterceptorContext.create()); assertThrows(IllegalStateException.class, - () -> toObservable(responseObservable).toBlocking().first()); + () -> Mono.from(responseObservable).block()); } private static Stream multipleSubscriptionInterceptors() { diff --git a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java index 37f9d15571..059d7dc0d5 100644 --- a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java +++ b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java @@ -20,6 +20,7 @@ import com.hotels.styx.api.Buffer; import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.ContentOverflowException; +import com.hotels.styx.api.Eventual; import com.hotels.styx.api.HttpHandler; import com.hotels.styx.api.HttpInterceptor; import com.hotels.styx.api.HttpResponseStatus; @@ -51,11 +52,10 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.TooLongFrameException; +import org.reactivestreams.Subscription; import org.slf4j.Logger; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; -import rx.Observable; -import rx.Subscriber; -import rx.Subscription; import javax.net.ssl.SSLHandshakeException; import java.io.IOException; @@ -87,7 +87,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static org.slf4j.LoggerFactory.getLogger; -import static rx.RxReactiveStreams.toObservable; /** * Passes request to HTTP Pipeline. @@ -265,25 +264,29 @@ private State onLegitimateRequest(LiveHttpRequest request, ChannelHandlerContext // the same call stack as "onLegitimateRequest" handler. This happens when a plugin // generates a response. try { - Observable responseObservable = toObservable(httpPipeline.handle(v11Request, newInterceptorContext(ctx))); - subscription = responseObservable - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - eventProcessor.submit(new ResponseObservableCompletedEvent(ctx, request.id())); - } - - @Override - public void onError(Throwable cause) { - eventProcessor.submit(new ResponseObservableErrorEvent(ctx, cause, request.id())); - } - - @Override - public void onNext(LiveHttpResponse response) { - eventProcessor.submit(new ResponseReceivedEvent(response, ctx)); - } - } - ); + Eventual responseEventual = httpPipeline.handle(v11Request, newInterceptorContext(ctx)); + responseEventual.subscribe(new BaseSubscriber() { + @Override + public void hookOnSubscribe(Subscription s) { + subscription = s; + s.request(1); + } + + @Override + public void hookOnComplete() { + eventProcessor.submit(new ResponseObservableCompletedEvent(ctx, request.id())); + } + + @Override + public void hookOnError(Throwable cause) { + eventProcessor.submit(new ResponseObservableErrorEvent(ctx, cause, request.id())); + } + + @Override + public void hookOnNext(LiveHttpResponse response) { + eventProcessor.submit(new ResponseReceivedEvent(response, ctx)); + } + }); return WAITING_FOR_RESPONSE; } catch (Throwable cause) { @@ -628,7 +631,7 @@ private static class ResponseObservableCompletedEvent { private void cancelSubscription() { if (subscription != null) { - subscription.unsubscribe(); + subscription.cancel(); } } diff --git a/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java b/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java index d55f91ab77..fab6cdfa6d 100644 --- a/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java +++ b/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java @@ -46,9 +46,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import rx.Observable; -import rx.subjects.PublishSubject; import javax.net.ssl.SSLHandshakeException; import java.io.IOException; @@ -101,11 +101,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import static rx.RxReactiveStreams.toPublisher; public class HttpPipelineHandlerTest { private final HttpHandler respondingHandler = (request, context) -> Eventual.of(response(OK).build()); - private final HttpHandler doNotRespondHandler = (request, context) -> new Eventual<>(toPublisher(Observable.never())); + private final HttpHandler doNotRespondHandler = (request, context) -> new Eventual<>(Mono.never()); private HttpErrorStatusListener errorListener; private CodaHaleMetricRegistry metrics; @@ -119,8 +118,8 @@ public Eventual handle(LiveHttpRequest request, HttpIntercepto }); private ChannelHandlerContext ctx; - private PublishSubject responseObservable; - private PublishSubject responseObservable2; + private EmitterProcessor responseObservable; + private EmitterProcessor responseObservable2; private CompletableFuture writerFuture; private HttpResponseWriter responseWriter; private HttpPipelineHandler handler; @@ -145,7 +144,7 @@ public void setUp() throws Exception { statsCollector = mock(RequestStatsCollector.class); errorListener = mock(HttpErrorStatusListener.class); ctx = mockCtx(); - responseObservable = PublishSubject.create(); + responseObservable = EmitterProcessor.create(); responseUnsubscribed = new AtomicBoolean(false); writerFuture = new CompletableFuture<>(); @@ -158,7 +157,7 @@ public void setUp() throws Exception { .thenReturn(responseWriter); pipeline = mock(HttpHandler.class); - when(pipeline.handle(nullable(LiveHttpRequest.class), nullable(HttpInterceptor.Context.class))).thenReturn(new Eventual<>(toPublisher(responseObservable.doOnUnsubscribe(() -> responseUnsubscribed.set(true))))); + when(pipeline.handle(nullable(LiveHttpRequest.class), nullable(HttpInterceptor.Context.class))).thenReturn(new Eventual<>(Flux.from(responseObservable.doOnCancel(() -> responseUnsubscribed.set(true))))); request = get("/foo").id("REQUEST-1-ID").build(); response = response().build(); @@ -171,7 +170,7 @@ public void setUp() throws Exception { } private void setUpFor2Requests() throws Exception { - responseObservable2 = PublishSubject.create(); + responseObservable2 = EmitterProcessor.create(); responseUnsubscribed2 = new AtomicBoolean(false); response2 = response().build(); @@ -187,8 +186,8 @@ private void setUpFor2Requests() throws Exception { pipeline = mock(HttpHandler.class); when(pipeline.handle(anyObject(), any(HttpInterceptor.Context.class))) - .thenReturn(new Eventual<>(toPublisher(responseObservable.doOnUnsubscribe(() -> responseUnsubscribed.set(true))))) - .thenReturn(new Eventual<>(toPublisher(responseObservable2.doOnUnsubscribe(() -> responseUnsubscribed2.set(true))))); + .thenReturn(new Eventual<>(responseObservable.doOnCancel(() -> responseUnsubscribed.set(true)))) + .thenReturn(new Eventual<>(responseObservable2.doOnCancel(() -> responseUnsubscribed2.set(true)))); request2 = get("/bar").id("REQUEST-2-ID").build(); @@ -200,31 +199,6 @@ public void tearDown() { logger.stop(); } - private HttpPipelineHandler createHandler(HttpHandler pipeline) throws Exception { - metrics = new CodaHaleMetricRegistry(); - HttpPipelineHandler handler = handlerWithMocks(pipeline) - .responseWriterFactory(responseWriterFactory) - .build(); - - handler.channelActive(ctx); - - return handler; - } - - private void setupHandlerTo(State targetState) throws Exception { - handler = createHandler(pipeline); - - if (targetState == WAITING_FOR_RESPONSE) { - handler.channelRead0(ctx, request); - assertThat(handler.state(), is(WAITING_FOR_RESPONSE)); - } else if (targetState == SENDING_RESPONSE) { - handler.channelRead0(ctx, request); - assertThat(handler.state(), is(WAITING_FOR_RESPONSE)); - responseObservable.onNext(response); - assertThat(handler.state(), is(SENDING_RESPONSE)); - } - } - @Test public void mapsWrappedBadRequestExceptionToBadRequest400ResponseCode() { EmbeddedChannel channel = buildEmbeddedChannel(handlerWithMocks()); @@ -323,7 +297,7 @@ public void allowsResponseObservableToCompleteAfterAfterDisconnect() throws Exce handler.channelInactive(ctx); // ... only after that the response observable completes: - responseObservable.onCompleted(); + responseObservable.onComplete(); // ... then treat it like a successfully sent response: writerFuture.complete(null); @@ -407,7 +381,7 @@ public void proxiesRequestAndResponse() throws Exception { verify(statsCollector).onRequest(request.id()); responseObservable.onNext(response); - responseObservable.onCompleted(); + responseObservable.onComplete(); assertThat(handler.state(), is(SENDING_RESPONSE)); writerFuture.complete(null); @@ -429,7 +403,7 @@ public void prematureRequestIsNotProxiedUntilPreviousResponseIsSuccessfullyWritt // Response arrives, together with response onComplete event. // The response onComplete arrives *before* response has been written. responseObservable.onNext(response); - responseObservable.onCompleted(); + responseObservable.onComplete(); assertThat(handler.state(), is(SENDING_RESPONSE)); // Receive second request while still sending previous response. @@ -461,7 +435,7 @@ public void proxiesPrematureRequestAfterPreviousResponseIsSuccessfullyWritten() // Response arrives, together with response onComplete event. // The response onComplete arrives *before* response has been written. responseObservable.onNext(response); - responseObservable.onCompleted(); + responseObservable.onComplete(); assertThat(handler.state(), is(SENDING_RESPONSE)); verify(responseWriter).write(any(LiveHttpResponse.class)); @@ -493,7 +467,7 @@ public void closesConnectionWhenMultiplePrematureRequestsAreDetectedInSendingRes // Response arrives. responseObservable.onNext(response); - responseObservable.onCompleted(); + responseObservable.onComplete(); assertThat(handler.state(), is(SENDING_RESPONSE)); verify(responseWriter).write(nullable(LiveHttpResponse.class)); @@ -549,7 +523,7 @@ public void closesTheConnectionAfterProxyingWhenConnectionHeaderHasValueClose() verify(statsCollector).onRequest(oneShotRequest.id()); responseObservable.onNext(response); - responseObservable.onCompleted(); + responseObservable.onComplete(); assertThat(handler.state(), is(SENDING_RESPONSE)); writerFuture.complete(null); @@ -885,7 +859,7 @@ public void discardsOnCompleteEventsForEarlierRequestsInWaitingForResponseState( handler.channelRead0(ctx, request2); assertThat(handler.state(), is(WAITING_FOR_RESPONSE)); - responseObservable.onCompleted(); + responseObservable.onComplete(); assertThat(handler.state(), is(WAITING_FOR_RESPONSE)); } @@ -908,7 +882,7 @@ public void discardsOnCompleteEventsForEarlierRequestsInSendingResponseState() t responseObservable2.onNext(response2); assertThat(handler.state(), is(SENDING_RESPONSE)); - responseObservable.onCompleted(); + responseObservable.onComplete(); assertThat(handler.state(), is(SENDING_RESPONSE)); } @@ -953,18 +927,6 @@ public void removesOngoingResponeFromLogMessages() throws Exception { assertTrue(logger.lastMessage().getMessage().contains("ongoingResponse=null")); } - private static HttpResponseWriterFactory responseWriterFactory(CompletableFuture future) { - HttpResponseWriterFactory writerFactory = mock(HttpResponseWriterFactory.class); - HttpResponseWriter responseWriter = mock(HttpResponseWriter.class); - when(writerFactory.create(any(ChannelHandlerContext.class))).thenReturn(responseWriter); - when(responseWriter.write(any(LiveHttpResponse.class))).thenReturn(future); - return writerFactory; - } - - private static void activateChannel(ChannelHandlerContext ctx) { - when(ctx.channel().isActive()).thenReturn(true); - } - @Test public void cancelsOngoingRequestWhenSpuriousRequestArrivesInWaitingForResponseState() throws Exception { // - writes EMPTY_LAST_CONTENT and closes the channel @@ -996,6 +958,31 @@ public void logsSslHandshakeErrors() throws Exception { loggingEvent(INFO, "SSL handshake failure from incoming connection .*"))); } + private void setupHandlerTo(State targetState) throws Exception { + handler = createHandler(pipeline); + + if (targetState == WAITING_FOR_RESPONSE) { + handler.channelRead0(ctx, request); + assertThat(handler.state(), is(WAITING_FOR_RESPONSE)); + } else if (targetState == SENDING_RESPONSE) { + handler.channelRead0(ctx, request); + assertThat(handler.state(), is(WAITING_FOR_RESPONSE)); + responseObservable.onNext(response); + assertThat(handler.state(), is(SENDING_RESPONSE)); + } + } + + private HttpPipelineHandler createHandler(HttpHandler pipeline) throws Exception { + metrics = new CodaHaleMetricRegistry(); + HttpPipelineHandler handler = handlerWithMocks(pipeline) + .responseWriterFactory(responseWriterFactory) + .build(); + + handler.channelActive(ctx); + + return handler; + } + private HttpPipelineHandler.Builder handlerWithMocks() { return handlerWithMocks(pipeline); } @@ -1008,6 +995,18 @@ private HttpPipelineHandler.Builder handlerWithMocks(HttpHandler pipeline) { .metricRegistry(metrics); } + private static HttpResponseWriterFactory responseWriterFactory(CompletableFuture future) { + HttpResponseWriterFactory writerFactory = mock(HttpResponseWriterFactory.class); + HttpResponseWriter responseWriter = mock(HttpResponseWriter.class); + when(writerFactory.create(any(ChannelHandlerContext.class))).thenReturn(responseWriter); + when(responseWriter.write(any(LiveHttpResponse.class))).thenReturn(future); + return writerFactory; + } + + private static void activateChannel(ChannelHandlerContext ctx) { + when(ctx.channel().isActive()).thenReturn(true); + } + private static ChannelHandlerContext mockCtx() { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); ChannelFuture future = channelFutureOk();