From 4d226c9463035c40bc3b3c0498d1027944ed2ae1 Mon Sep 17 00:00:00 2001 From: OwenLindsell Date: Mon, 18 Nov 2019 11:12:27 +0000 Subject: [PATCH 01/12] replace rx observable with reactor flux --- .../com/hotels/styx/client/Connection.java | 6 ++- .../com/hotels/styx/client/Operation.java | 37 ------------------- .../styx/client/StyxHostHttpClient.java | 4 +- .../hotels/styx/client/StyxHttpClient.java | 6 +-- .../connectionpool/ExpiringConnection.java | 8 ++-- .../stubs/StubConnectionFactory.java | 4 +- .../connectionpool/HttpRequestOperation.java | 33 +++++++++-------- .../netty/connectionpool/NettyConnection.java | 4 +- .../NettyToStyxResponsePropagator.java | 22 ++++++----- .../styx/client/StyxHostHttpClientTest.java | 22 ++++++----- .../NettyToStyxResponsePropagatorTest.java | 14 +++---- .../RequestsToOriginMetricsCollectorTest.java | 4 +- 12 files changed, 65 insertions(+), 99 deletions(-) delete mode 100644 components/client/src/main/java/com/hotels/styx/client/Operation.java diff --git a/components/client/src/main/java/com/hotels/styx/client/Connection.java b/components/client/src/main/java/com/hotels/styx/client/Connection.java index a1d04d90fb..9a189381a9 100644 --- a/components/client/src/main/java/com/hotels/styx/client/Connection.java +++ b/components/client/src/main/java/com/hotels/styx/client/Connection.java @@ -18,6 +18,8 @@ import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.extension.Origin; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import rx.Observable; @@ -47,9 +49,9 @@ interface Factory { * Writes HTTP request to a remote peer in the context of this connection. * * @param request - * @return an observable that provides the response + * @return a Publisher that provides the response */ - Observable write(LiveHttpRequest request); + Publisher write(LiveHttpRequest request); /** * Returns if the underlying connection is still active. diff --git a/components/client/src/main/java/com/hotels/styx/client/Operation.java b/components/client/src/main/java/com/hotels/styx/client/Operation.java deleted file mode 100644 index 3b14ec9c0c..0000000000 --- a/components/client/src/main/java/com/hotels/styx/client/Operation.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - Copyright (C) 2013-2018 Expedia Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.hotels.styx.client; - -import rx.Observable; - -/** - * An operation that can be executed on a connection and that returns an observable to provide the result. - *

- * Has an associated application ID. - * - * @param connection type - * @param result type - */ -public interface Operation { - - /** - * Executes the operation. - * - * @param connection connection to operate on - * @return an observable to provide the result - */ - Observable execute(C connection); -} diff --git a/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java b/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java index 880c5ca9a3..857909879e 100644 --- a/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java +++ b/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java @@ -23,7 +23,6 @@ import com.hotels.styx.client.connectionpool.ConnectionPool; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import rx.RxReactiveStreams; import static java.util.Objects.requireNonNull; @@ -44,9 +43,8 @@ public static StyxHostHttpClient create(ConnectionPool pool) { public Publisher sendRequest(LiveHttpRequest request) { return Flux.from(pool.borrowConnection()) .flatMap(connection -> { - Publisher write = RxReactiveStreams.toPublisher(connection.write(request)); - return ResponseEventListener.from(write) + return ResponseEventListener.from(connection.write(request)) .whenCancelled(() -> pool.closeConnection(connection)) .whenResponseError(cause -> pool.closeConnection(connection)) .whenContentError(cause -> pool.closeConnection(connection)) diff --git a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java index 61ed65a842..e6853a92c6 100644 --- a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java +++ b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java @@ -28,7 +28,6 @@ import com.hotels.styx.client.ssl.SslContextFactory; import io.netty.handler.ssl.SslContext; import reactor.core.publisher.Mono; -import rx.RxReactiveStreams; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -122,9 +121,8 @@ static Mono sendRequestInternal(NettyConnectionFactory connect new ConnectionSettings(params.connectTimeoutMillis()), sslContext ).flatMap(connection -> - Mono.from(RxReactiveStreams.toPublisher( - connection.write(networkRequest) - .doOnTerminate(connection::close))) + Mono.from(connection.write(networkRequest)) + .doOnTerminate(connection::close) ); return responseObservable; diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java index ca8395bc83..9e1d4e0a30 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Expedia Inc. + Copyright (C) 2013-2019 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,9 +19,9 @@ import com.google.common.base.Ticker; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; -import com.hotels.styx.client.Connection; import com.hotels.styx.api.extension.Origin; -import rx.Observable; +import com.hotels.styx.client.Connection; +import org.reactivestreams.Publisher; import java.util.function.Supplier; @@ -53,7 +53,7 @@ public boolean isConnected() { } @Override - public Observable write(LiveHttpRequest request) { + public Publisher write(LiveHttpRequest request) { return nettyConnection.write(request); } diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java index 66e9919b74..3dff712ac8 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java @@ -21,8 +21,8 @@ import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.Connection; import com.hotels.styx.client.ConnectionSettings; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import rx.Observable; import static com.google.common.base.Objects.toStringHelper; @@ -50,7 +50,7 @@ public StubConnection(Origin origin) { } @Override - public Observable write(LiveHttpRequest request) { + public Publisher write(LiveHttpRequest request) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java index 31a7e9509f..fd242b4cc9 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java @@ -24,7 +24,6 @@ import com.hotels.styx.api.Requests; import com.hotels.styx.api.exceptions.TransportLostException; import com.hotels.styx.api.extension.Origin; -import com.hotels.styx.client.Operation; import com.hotels.styx.client.OriginStatsFactory; import com.hotels.styx.common.format.HttpMessageFormatter; import com.hotels.styx.common.format.SanitisedHttpMessageFormatter; @@ -38,7 +37,10 @@ import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.timeout.IdleStateHandler; +import org.reactivestreams.Publisher; import org.slf4j.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import rx.Observable; import rx.Subscriber; @@ -61,7 +63,7 @@ /** * An operation that writes an HTTP request to an origin. */ -public class HttpRequestOperation implements Operation { +public class HttpRequestOperation { private static final String IDLE_HANDLER_NAME = "idle-handler"; private static final Logger LOGGER = getLogger(HttpRequestOperation.class); @@ -128,18 +130,17 @@ private static boolean requestIsOngoing(RequestBodyChunkSubscriber bodyChunkSubs return bodyChunkSubscriber != null && bodyChunkSubscriber.requestIsOngoing(); } - @Override - public Observable execute(NettyConnection nettyConnection) { + public Publisher execute(NettyConnection nettyConnection) { AtomicReference requestRequestBodyChunkSubscriber = new AtomicReference<>(); requestTime = System.currentTimeMillis(); executeCount.incrementAndGet(); - Observable observable = Observable.create(subscriber -> { + Flux responseFlux = Flux.create(sink -> { if (nettyConnection.isConnected()) { RequestBodyChunkSubscriber bodyChunkSubscriber = new RequestBodyChunkSubscriber(request, nettyConnection); requestRequestBodyChunkSubscriber.set(bodyChunkSubscriber); - addProxyBridgeHandlers(nettyConnection, subscriber); - new WriteRequestToOrigin(subscriber, nettyConnection, request, bodyChunkSubscriber) + addProxyBridgeHandlers(nettyConnection, sink); + new WriteRequestToOrigin(sink, nettyConnection, request, bodyChunkSubscriber) .write(); if (requestLoggingEnabled) { httpRequestMessageLogger.logRequest(request, nettyConnection.getOrigin()); @@ -148,13 +149,13 @@ public Observable execute(NettyConnection nettyConnection) { }); if (requestLoggingEnabled) { - observable = observable + responseFlux = responseFlux .doOnNext(response -> { httpRequestMessageLogger.logResponse(request, response); }); } - return observable.map(response -> + return responseFlux.map(response -> Requests.doFinally(response, cause -> { if (nettyConnection.isConnected()) { removeProxyBridgeHandlers(nettyConnection); @@ -168,7 +169,7 @@ public Observable execute(NettyConnection nettyConnection) { })); } - private void addProxyBridgeHandlers(NettyConnection nettyConnection, Subscriber observer) { + private void addProxyBridgeHandlers(NettyConnection nettyConnection, FluxSink sink) { Origin origin = nettyConnection.getOrigin(); Channel channel = nettyConnection.channel(); channel.pipeline().addLast(IDLE_HANDLER_NAME, new IdleStateHandler(0, 0, responseTimeoutMillis, MILLISECONDS)); @@ -178,7 +179,7 @@ private void addProxyBridgeHandlers(NettyConnection nettyConnection, Subscriber< new RequestsToOriginMetricsCollector(originStatsFactory.originStats(origin)))); channel.pipeline().addLast( NettyToStyxResponsePropagator.NAME, - new NettyToStyxResponsePropagator(observer, origin, responseTimeoutMillis, MILLISECONDS, request)); + new NettyToStyxResponsePropagator(sink, origin, responseTimeoutMillis, MILLISECONDS, request)); } private void removeProxyBridgeHandlers(NettyConnection connection) { @@ -224,14 +225,14 @@ public String toString() { } private static final class WriteRequestToOrigin { - private final Subscriber responseFromOriginObserver; + private final FluxSink responseFromOriginFlux; private final NettyConnection nettyConnection; private final LiveHttpRequest request; private final RequestBodyChunkSubscriber requestBodyChunkSubscriber; - private WriteRequestToOrigin(Subscriber responseFromOriginObserver, NettyConnection nettyConnection, LiveHttpRequest request, + private WriteRequestToOrigin(FluxSink responseFromOriginFlux, NettyConnection nettyConnection, LiveHttpRequest request, RequestBodyChunkSubscriber requestBodyChunkSubscriber) { - this.responseFromOriginObserver = responseFromOriginObserver; + this.responseFromOriginFlux = responseFromOriginFlux; this.nettyConnection = nettyConnection; this.request = request; this.requestBodyChunkSubscriber = requestBodyChunkSubscriber; @@ -245,7 +246,7 @@ public void write() { originChannel.writeAndFlush(messageHeaders) .addListener(subscribeToRequestBody()); } else { - responseFromOriginObserver.onError(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin())); + responseFromOriginFlux.error(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin())); } } @@ -259,7 +260,7 @@ private ChannelFutureListener subscribeToRequestBody() { String channelIdentifier = String.format("%s -> %s", nettyConnection.channel().localAddress(), nettyConnection.channel().remoteAddress()); LOGGER.error(format("Failed to send request headers. origin=%s connection=%s request=%s", nettyConnection.getOrigin(), channelIdentifier, request), headersFuture.cause()); - responseFromOriginObserver.onError(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin())); + responseFromOriginFlux.error(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin())); } }; } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java index e718a3d358..0f0e34b1a6 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java @@ -29,7 +29,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; -import rx.Observable; +import org.reactivestreams.Publisher; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -91,7 +91,7 @@ private static void addChannelHandlers(Channel channel, HttpConfig httpConfig, S } @Override - public Observable write(LiveHttpRequest request) { + public Publisher write(LiveHttpRequest request) { return this.requestOperationFactory.newHttpRequestOperation(request).execute(this); } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java index ab9a0b2c5e..ca8d6a15f5 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java @@ -34,6 +34,8 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.MonoSink; import rx.Observable; import rx.Producer; import rx.Subscriber; @@ -60,7 +62,7 @@ final class NettyToStyxResponsePropagator extends SimpleChannelInboundHandler { private static final Logger LOGGER = getLogger(NettyToStyxResponsePropagator.class); private final AtomicBoolean responseCompleted = new AtomicBoolean(false); - private final Subscriber responseObserver; + private final FluxSink sink; private final LiveHttpRequest request; private final Origin origin; @@ -71,23 +73,23 @@ final class NettyToStyxResponsePropagator extends SimpleChannelInboundHandler { // to be delivered from the same thread. private boolean toBeClosed; - NettyToStyxResponsePropagator(Subscriber responseObserver, Origin origin) { - this(responseObserver, origin, 5L, TimeUnit.SECONDS); + NettyToStyxResponsePropagator(FluxSink sink, Origin origin) { + this(sink, origin, 5L, TimeUnit.SECONDS); } - NettyToStyxResponsePropagator(Subscriber responseObserver, + NettyToStyxResponsePropagator(FluxSink sink, Origin origin, long idleTimeout, TimeUnit timeUnit) { - this(responseObserver, origin, idleTimeout, timeUnit, null); + this(sink, origin, idleTimeout, timeUnit, null); } - NettyToStyxResponsePropagator(Subscriber responseObserver, + NettyToStyxResponsePropagator(FluxSink sink, Origin origin, long idleTimeout, TimeUnit timeUnit, LiveHttpRequest request) { - this.responseObserver = responseObserver; + this.sink = sink; this.origin = origin; this.idleTimeoutMillis = timeUnit.toMillis(idleTimeout); this.request = request; @@ -150,7 +152,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except } LiveHttpResponse response = toStyxResponse(nettyResponse, contentObservable, origin); - this.responseObserver.onNext(response); + this.sink.next(response); } if (msg instanceof HttpContent) { ByteBuf content = ((ByteBufHolder) msg).content(); @@ -210,13 +212,13 @@ private FlowControllingHttpContentProducer createProducer(ChannelHandlerContext private void emitResponseCompleted() { if (responseCompleted.compareAndSet(false, true)) { - responseObserver.onCompleted(); + sink.complete(); } } private void emitResponseError(Throwable cause) { if (responseCompleted.compareAndSet(false, true)) { - this.responseObserver.onError(cause); + this.sink.error(cause); } } diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java index 68e8b14b79..4713f55a3a 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; @@ -47,7 +49,7 @@ public class StyxHostHttpClientTest { private LiveHttpRequest request; private LiveHttpResponse response; - private PublishSubject responseProvider; + private EmitterProcessor responseProvider; @BeforeEach public void setUp() { @@ -57,12 +59,12 @@ public void setUp() { .body("xyz", UTF_8) .build() .stream(); - responseProvider = PublishSubject.create(); + responseProvider = EmitterProcessor.create(); } @Test public void returnsConnectionBackToPool() { - Connection connection = mockConnection(just(response)); + Connection connection = mockConnection(Flux.just(response)); ConnectionPool pool = mockPool(connection); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); @@ -80,9 +82,9 @@ public void returnsConnectionBackToPool() { @Test public void ignoresCancelledHeaders() { // Request observable unsubscribe/cancel has to be ignored after "onNext" event. - // This is because Reactor Mono will automatically cancel after an event has + // This is because Reactor Flux will automatically cancel after an event has // been published. - Connection connection = mockConnection(just(response)); + Connection connection = mockConnection(Flux.just(response)); ConnectionPool pool = mockPool(connection); AtomicReference transformedResponse = new AtomicReference<>(); @@ -107,7 +109,7 @@ public void ignoresCancelledHeaders() { @Test public void releasesIfRequestIsCancelledBeforeHeaders() { - Connection connection = mockConnection(PublishSubject.create()); + Connection connection = mockConnection(EmitterProcessor.create()); ConnectionPool pool = mockPool(connection); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); @@ -152,7 +154,7 @@ public void ignoresResponseObservableErrorsAfterHeaders() { @Test public void terminatesConnectionWhenResponseObservableCompletesWithoutHeaders() { // A connection that yields no response: - Connection connection = mockConnection(Observable.empty()); + Connection connection = mockConnection(Flux.empty()); ConnectionPool pool = mockPool(connection); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); @@ -168,7 +170,7 @@ public void terminatesConnectionWhenResponseObservableCompletesWithoutHeaders() @Test public void releasesConnectionWhenResponseFailsBeforeHeaders() { - Connection connection = mockConnection(Observable.error(new RuntimeException())); + Connection connection = mockConnection(Flux.error(new RuntimeException())); ConnectionPool pool = mockPool(connection); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); @@ -184,7 +186,7 @@ public void releasesConnectionWhenResponseFailsBeforeHeaders() { @Test public void terminatesConnectionDueToUnsubscribedBody() { TestPublisher testPublisher = TestPublisher.create(); - Connection connection = mockConnection(Observable.just(LiveHttpResponse.response(OK).body(new ByteStream(testPublisher)).build())); + Connection connection = mockConnection(Flux.just(LiveHttpResponse.response(OK).body(new ByteStream(testPublisher)).build())); ConnectionPool pool = mockPool(connection); AtomicReference receivedResponse = new AtomicReference<>(); @@ -212,7 +214,7 @@ public void closesTheConnectionPool() { verify(pool).close(); } - Connection mockConnection(Observable responseObservable) { + Connection mockConnection(Flux responseObservable) { Connection connection = mock(Connection.class); when(connection.write(any(LiveHttpRequest.class))).thenReturn(responseObservable); return connection; diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java index 08c3825c2b..d9320b54db 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java @@ -33,7 +33,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import rx.Subscriber; +import reactor.core.publisher.FluxSink; import rx.observers.TestSubscriber; import java.io.PrintWriter; @@ -85,37 +85,37 @@ public void setUp() { @Test public void notifiesSubscriberForNettyPipelineExceptions() { - Subscriber subscriber = mock(Subscriber.class); + FluxSink subscriber = mock(FluxSink.class); NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(subscriber, SOME_ORIGIN); EmbeddedChannel channel = new EmbeddedChannel(handler); channel.pipeline().fireExceptionCaught(new RuntimeException("Error")); ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); - verify(subscriber, times(1)).onError(captor.capture()); + verify(subscriber, times(1)).error(captor.capture()); assertThat(captor.getValue(), is(instanceOf(BadHttpResponseException.class))); } @Test public void propagatesExceptionWhenThereIsDecodeErrorInReceivedResponse() throws Exception { - Subscriber subscriber = mock(Subscriber.class); + FluxSink subscriber = mock(FluxSink.class); NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(subscriber, SOME_ORIGIN); EmbeddedChannel channel = new EmbeddedChannel(handler); channel.writeInbound(newCorruptedResponse()); - verify(subscriber).onError(any(BadHttpResponseException.class)); + verify(subscriber).error(any(BadHttpResponseException.class)); } @Test public void notifiesSubscriberWhenChannelBecomesInactive() throws Exception { - Subscriber subscriber = mock(Subscriber.class); + FluxSink subscriber = mock(FluxSink.class); NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(subscriber, SOME_ORIGIN); EmbeddedChannel channel = new EmbeddedChannel(handler); channel.pipeline().fireChannelInactive(); - verify(subscriber).onError(any(TransportLostException.class)); + verify(subscriber).error(any(TransportLostException.class)); } @Test diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java index ac2565c28a..422fbad169 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java @@ -36,7 +36,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import rx.Subscriber; +import reactor.core.publisher.FluxSink; import java.util.List; import java.util.Optional; @@ -119,7 +119,7 @@ private EmbeddedChannel buildEmbeddedChannel() { return new EmbeddedChannel( new HttpClientCodec(), new RequestsToOriginMetricsCollector(originMetrics), - new NettyToStyxResponsePropagator(mock(Subscriber.class), this.origin) + new NettyToStyxResponsePropagator(mock(FluxSink.class), this.origin) ); } From b1e6c68ac43c64dbc632e5c7790d9d1720d59271 Mon Sep 17 00:00:00 2001 From: OwenLindsell Date: Fri, 22 Nov 2019 10:25:49 +0000 Subject: [PATCH 02/12] committing broken code to share with Mikko --- .../com/hotels/styx/client/Connection.java | 7 +- .../styx/client/StyxHostHttpClient.java | 4 +- .../hotels/styx/client/StyxHttpClient.java | 18 +- .../connectionpool/ExpiringConnection.java | 9 +- .../stubs/StubConnectionFactory.java | 7 +- .../FlowControllingHttpContentProducer.java | 2 +- .../connectionpool/HttpRequestOperation.java | 36 ++-- .../netty/connectionpool/NettyConnection.java | 5 +- .../NettyToStyxResponsePropagator.java | 28 ++- .../styx/client/StyxHostHttpClientTest.java | 21 +- .../NettyToStyxResponsePropagatorTest.java | 196 +++++------------- .../RequestsToOriginMetricsCollectorTest.java | 4 +- 12 files changed, 130 insertions(+), 207 deletions(-) diff --git a/components/client/src/main/java/com/hotels/styx/client/Connection.java b/components/client/src/main/java/com/hotels/styx/client/Connection.java index a1d04d90fb..f80eb652c2 100644 --- a/components/client/src/main/java/com/hotels/styx/client/Connection.java +++ b/components/client/src/main/java/com/hotels/styx/client/Connection.java @@ -18,8 +18,9 @@ import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.extension.Origin; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import rx.Observable; import java.io.Closeable; import java.util.EventListener; @@ -47,9 +48,9 @@ interface Factory { * Writes HTTP request to a remote peer in the context of this connection. * * @param request - * @return an observable that provides the response + * @return a Publisher that provides the response */ - Observable write(LiveHttpRequest request); + Flux write(LiveHttpRequest request); /** * Returns if the underlying connection is still active. diff --git a/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java b/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java index 880c5ca9a3..857909879e 100644 --- a/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java +++ b/components/client/src/main/java/com/hotels/styx/client/StyxHostHttpClient.java @@ -23,7 +23,6 @@ import com.hotels.styx.client.connectionpool.ConnectionPool; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import rx.RxReactiveStreams; import static java.util.Objects.requireNonNull; @@ -44,9 +43,8 @@ public static StyxHostHttpClient create(ConnectionPool pool) { public Publisher sendRequest(LiveHttpRequest request) { return Flux.from(pool.borrowConnection()) .flatMap(connection -> { - Publisher write = RxReactiveStreams.toPublisher(connection.write(request)); - return ResponseEventListener.from(write) + return ResponseEventListener.from(connection.write(request)) .whenCancelled(() -> pool.closeConnection(connection)) .whenResponseError(cause -> pool.closeConnection(connection)) .whenContentError(cause -> pool.closeConnection(connection)) diff --git a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java index 61ed65a842..3293d59d71 100644 --- a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java +++ b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java @@ -27,12 +27,13 @@ import com.hotels.styx.client.netty.connectionpool.NettyConnectionFactory; import com.hotels.styx.client.ssl.SslContextFactory; import io.netty.handler.ssl.SslContext; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import rx.RxReactiveStreams; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.base.Preconditions.checkArgument; import static com.hotels.styx.api.HttpHeaderNames.HOST; @@ -122,14 +123,23 @@ static Mono sendRequestInternal(NettyConnectionFactory connect new ConnectionSettings(params.connectTimeoutMillis()), sslContext ).flatMap(connection -> - Mono.from(RxReactiveStreams.toPublisher( - connection.write(networkRequest) - .doOnTerminate(connection::close))) + Mono.from(connection.write(networkRequest) + .doOnTerminate(() -> { + // @Mikko, In the new code this gets called for each of the ctx.close() calls in ExcessConnectionRejector. However, it + // does not get called when the streams complete - I think because they are cancelled, not completed. + // In the old rxJava code, this gets called 10 times to close all 10 connections. I think this is + // because the streams complete successfully. You will see a call made to FlowControllingHttpContentProducer.contentEndEventWhileStreaming + // in the old code, but not in the new code. + System.out.println("****TERMINATING: " + count.incrementAndGet()); + connection.close(); + }) + ) ); return responseObservable; } + static AtomicInteger count = new AtomicInteger(); private static LiveHttpRequest addUserAgent(String userAgent, LiveHttpRequest request) { if (userAgent != null) { diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java index ca8395bc83..4d71742dc4 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Expedia Inc. + Copyright (C) 2013-2019 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,9 +19,10 @@ import com.google.common.base.Ticker; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; -import com.hotels.styx.client.Connection; import com.hotels.styx.api.extension.Origin; -import rx.Observable; +import com.hotels.styx.client.Connection; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import java.util.function.Supplier; @@ -53,7 +54,7 @@ public boolean isConnected() { } @Override - public Observable write(LiveHttpRequest request) { + public Flux write(LiveHttpRequest request) { return nettyConnection.write(request); } diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java index 66e9919b74..46b699cf5c 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Expedia Inc. + Copyright (C) 2013-2019 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,8 +21,9 @@ import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.Connection; import com.hotels.styx.client.ConnectionSettings; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import rx.Observable; import static com.google.common.base.Objects.toStringHelper; @@ -50,7 +51,7 @@ public StubConnection(Origin origin) { } @Override - public Observable write(LiveHttpRequest request) { + public Flux write(LiveHttpRequest request) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java index 7445c71fc5..c63f04a7d6 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java @@ -296,7 +296,7 @@ private ProducerState contentEndEventWhileStreaming(ContentEndEvent event) { if (readQueue.size() > 0) { return EMITTING_BUFFERED_CONTENT; } else { - this.contentSubscriber.onCompleted(); + this.contentSubscriber.onCompleted(); // @Mikko, this get called in the old code, but not the new code this.onCompleteAction.run(); return COMPLETED; } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java index 7476179056..ca31645e2e 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java @@ -37,7 +37,10 @@ import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.timeout.IdleStateHandler; +import org.reactivestreams.Publisher; import org.slf4j.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import rx.Observable; import rx.Subscriber; @@ -127,17 +130,17 @@ private static boolean requestIsOngoing(RequestBodyChunkSubscriber bodyChunkSubs return bodyChunkSubscriber != null && bodyChunkSubscriber.requestIsOngoing(); } - public Observable execute(NettyConnection nettyConnection) { + public Flux execute(NettyConnection nettyConnection) { AtomicReference requestRequestBodyChunkSubscriber = new AtomicReference<>(); requestTime = System.currentTimeMillis(); executeCount.incrementAndGet(); - Observable observable = Observable.create(subscriber -> { + Flux responseFlux = Flux.create(sink -> { if (nettyConnection.isConnected()) { RequestBodyChunkSubscriber bodyChunkSubscriber = new RequestBodyChunkSubscriber(request, nettyConnection); requestRequestBodyChunkSubscriber.set(bodyChunkSubscriber); - addProxyBridgeHandlers(nettyConnection, subscriber); - new WriteRequestToOrigin(subscriber, nettyConnection, request, bodyChunkSubscriber) + addProxyBridgeHandlers(nettyConnection, sink); + new WriteRequestToOrigin(sink, nettyConnection, request, bodyChunkSubscriber) .write(); if (requestLoggingEnabled) { httpRequestMessageLogger.logRequest(request, nettyConnection.getOrigin()); @@ -145,14 +148,19 @@ public Observable execute(NettyConnection nettyConnection) { } }); + responseFlux = responseFlux.doOnCancel(() -> { + // @Mikko, In the old rxJava code the equivalent of doOnCancel is doOnUnsubscribe, which would also get called here. + // So maybe the problem isn't that it's being cancelled, because the same behaviour is in the old and new code. + System.out.println("****Cancelling"); + }); + if (requestLoggingEnabled) { - observable = observable + responseFlux = responseFlux .doOnNext(response -> { httpRequestMessageLogger.logResponse(request, response); }); } - - return observable.map(response -> + return responseFlux.map(response -> Requests.doFinally(response, cause -> { if (nettyConnection.isConnected()) { removeProxyBridgeHandlers(nettyConnection); @@ -166,7 +174,7 @@ public Observable execute(NettyConnection nettyConnection) { })); } - private void addProxyBridgeHandlers(NettyConnection nettyConnection, Subscriber observer) { + private void addProxyBridgeHandlers(NettyConnection nettyConnection, FluxSink sink) { Origin origin = nettyConnection.getOrigin(); Channel channel = nettyConnection.channel(); channel.pipeline().addLast(IDLE_HANDLER_NAME, new IdleStateHandler(0, 0, responseTimeoutMillis, MILLISECONDS)); @@ -176,7 +184,7 @@ private void addProxyBridgeHandlers(NettyConnection nettyConnection, Subscriber< new RequestsToOriginMetricsCollector(originStatsFactory.originStats(origin)))); channel.pipeline().addLast( NettyToStyxResponsePropagator.NAME, - new NettyToStyxResponsePropagator(observer, origin, responseTimeoutMillis, MILLISECONDS, request)); + new NettyToStyxResponsePropagator(sink, origin, responseTimeoutMillis, MILLISECONDS, request)); } private void removeProxyBridgeHandlers(NettyConnection connection) { @@ -221,14 +229,14 @@ public String toString() { } private static final class WriteRequestToOrigin { - private final Subscriber responseFromOriginObserver; + private final FluxSink responseFromOriginFlux; private final NettyConnection nettyConnection; private final LiveHttpRequest request; private final RequestBodyChunkSubscriber requestBodyChunkSubscriber; - private WriteRequestToOrigin(Subscriber responseFromOriginObserver, NettyConnection nettyConnection, LiveHttpRequest request, + private WriteRequestToOrigin(FluxSink responseFromOriginFlux, NettyConnection nettyConnection, LiveHttpRequest request, RequestBodyChunkSubscriber requestBodyChunkSubscriber) { - this.responseFromOriginObserver = responseFromOriginObserver; + this.responseFromOriginFlux = responseFromOriginFlux; this.nettyConnection = nettyConnection; this.request = request; this.requestBodyChunkSubscriber = requestBodyChunkSubscriber; @@ -242,7 +250,7 @@ public void write() { originChannel.writeAndFlush(messageHeaders) .addListener(subscribeToRequestBody()); } else { - responseFromOriginObserver.onError(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin())); + responseFromOriginFlux.error(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin())); } } @@ -256,7 +264,7 @@ private ChannelFutureListener subscribeToRequestBody() { String channelIdentifier = String.format("%s -> %s", nettyConnection.channel().localAddress(), nettyConnection.channel().remoteAddress()); LOGGER.error(format("Failed to send request headers. origin=%s connection=%s request=%s", nettyConnection.getOrigin(), channelIdentifier, request), headersFuture.cause()); - responseFromOriginObserver.onError(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin())); + responseFromOriginFlux.error(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin())); } }; } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java index e718a3d358..12b3b5287e 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java @@ -29,7 +29,8 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; -import rx.Observable; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -91,7 +92,7 @@ private static void addChannelHandlers(Channel channel, HttpConfig httpConfig, S } @Override - public Observable write(LiveHttpRequest request) { + public Flux write(LiveHttpRequest request) { return this.requestOperationFactory.newHttpRequestOperation(request).execute(this); } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java index ab9a0b2c5e..d6ca1baa78 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java @@ -34,6 +34,7 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; +import reactor.core.publisher.FluxSink; import rx.Observable; import rx.Producer; import rx.Subscriber; @@ -42,8 +43,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.hotels.styx.api.LiveHttpResponse.response; import static com.hotels.styx.api.HttpResponseStatus.statusWithCode; +import static com.hotels.styx.api.LiveHttpResponse.response; import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; import static io.netty.util.ReferenceCountUtil.retain; import static java.lang.String.format; @@ -60,7 +61,7 @@ final class NettyToStyxResponsePropagator extends SimpleChannelInboundHandler { private static final Logger LOGGER = getLogger(NettyToStyxResponsePropagator.class); private final AtomicBoolean responseCompleted = new AtomicBoolean(false); - private final Subscriber responseObserver; + private final FluxSink sink; private final LiveHttpRequest request; private final Origin origin; @@ -71,23 +72,16 @@ final class NettyToStyxResponsePropagator extends SimpleChannelInboundHandler { // to be delivered from the same thread. private boolean toBeClosed; - NettyToStyxResponsePropagator(Subscriber responseObserver, Origin origin) { - this(responseObserver, origin, 5L, TimeUnit.SECONDS); - } - - NettyToStyxResponsePropagator(Subscriber responseObserver, - Origin origin, - long idleTimeout, - TimeUnit timeUnit) { - this(responseObserver, origin, idleTimeout, timeUnit, null); + NettyToStyxResponsePropagator(FluxSink sink, Origin origin) { + this(sink, origin, 5L, TimeUnit.SECONDS, null); } - NettyToStyxResponsePropagator(Subscriber responseObserver, + NettyToStyxResponsePropagator(FluxSink sink, Origin origin, long idleTimeout, TimeUnit timeUnit, LiveHttpRequest request) { - this.responseObserver = responseObserver; + this.sink = sink; this.origin = origin; this.idleTimeoutMillis = timeUnit.toMillis(idleTimeout); this.request = request; @@ -150,7 +144,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except } LiveHttpResponse response = toStyxResponse(nettyResponse, contentObservable, origin); - this.responseObserver.onNext(response); + this.sink.next(response); } if (msg instanceof HttpContent) { ByteBuf content = ((ByteBufHolder) msg).content(); @@ -210,13 +204,15 @@ private FlowControllingHttpContentProducer createProducer(ChannelHandlerContext private void emitResponseCompleted() { if (responseCompleted.compareAndSet(false, true)) { - responseObserver.onCompleted(); + // @Mikko, Sink seems to always be cancelled. I think this is what is causing the doOnTerminate (line 127 in StyxHttpClient) not to be executed + System.out.println("Sink cancelled state: " + sink.isCancelled()); + sink.complete(); } } private void emitResponseError(Throwable cause) { if (responseCompleted.compareAndSet(false, true)) { - this.responseObserver.onError(cause); + this.sink.error(cause); } } diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java index 68e8b14b79..ed86db3117 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/StyxHostHttpClientTest.java @@ -26,11 +26,11 @@ import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; -import rx.Observable; -import rx.subjects.PublishSubject; import java.util.concurrent.atomic.AtomicReference; @@ -41,13 +41,12 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static rx.Observable.just; - +import static reactor.core.publisher.Flux.just; public class StyxHostHttpClientTest { private LiveHttpRequest request; private LiveHttpResponse response; - private PublishSubject responseProvider; + private EmitterProcessor responseProvider; @BeforeEach public void setUp() { @@ -57,7 +56,7 @@ public void setUp() { .body("xyz", UTF_8) .build() .stream(); - responseProvider = PublishSubject.create(); + responseProvider = EmitterProcessor.create(); } @Test @@ -107,7 +106,7 @@ public void ignoresCancelledHeaders() { @Test public void releasesIfRequestIsCancelledBeforeHeaders() { - Connection connection = mockConnection(PublishSubject.create()); + Connection connection = mockConnection(EmitterProcessor.create()); ConnectionPool pool = mockPool(connection); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); @@ -152,7 +151,7 @@ public void ignoresResponseObservableErrorsAfterHeaders() { @Test public void terminatesConnectionWhenResponseObservableCompletesWithoutHeaders() { // A connection that yields no response: - Connection connection = mockConnection(Observable.empty()); + Connection connection = mockConnection(Flux.empty()); ConnectionPool pool = mockPool(connection); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); @@ -168,7 +167,7 @@ public void terminatesConnectionWhenResponseObservableCompletesWithoutHeaders() @Test public void releasesConnectionWhenResponseFailsBeforeHeaders() { - Connection connection = mockConnection(Observable.error(new RuntimeException())); + Connection connection = mockConnection(Flux.error(new RuntimeException())); ConnectionPool pool = mockPool(connection); StyxHostHttpClient hostClient = new StyxHostHttpClient(pool); @@ -184,7 +183,7 @@ public void releasesConnectionWhenResponseFailsBeforeHeaders() { @Test public void terminatesConnectionDueToUnsubscribedBody() { TestPublisher testPublisher = TestPublisher.create(); - Connection connection = mockConnection(Observable.just(LiveHttpResponse.response(OK).body(new ByteStream(testPublisher)).build())); + Connection connection = mockConnection(just(LiveHttpResponse.response(OK).body(new ByteStream(testPublisher)).build())); ConnectionPool pool = mockPool(connection); AtomicReference receivedResponse = new AtomicReference<>(); @@ -212,7 +211,7 @@ public void closesTheConnectionPool() { verify(pool).close(); } - Connection mockConnection(Observable responseObservable) { + Connection mockConnection(Flux responseObservable) { Connection connection = mock(Connection.class); when(connection.write(any(LiveHttpRequest.class))).thenReturn(responseObservable); return connection; diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java index 08c3825c2b..680bbe910c 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java @@ -16,7 +16,6 @@ package com.hotels.styx.client.netty.connectionpool; import com.google.common.base.Throwables; -import com.hotels.styx.api.Buffers; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.exceptions.ResponseTimeoutException; import com.hotels.styx.api.exceptions.TransportLostException; @@ -33,14 +32,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import rx.Subscriber; -import rx.observers.TestSubscriber; +import reactor.core.publisher.FluxSink; +import reactor.test.StepVerifier; -import java.io.PrintWriter; -import java.io.StringWriter; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.List; import java.util.Optional; import static com.google.common.base.Charsets.UTF_8; @@ -56,22 +52,24 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; import static io.netty.handler.timeout.IdleStateEvent.ALL_IDLE_STATE_EVENT; -import static java.lang.Long.MAX_VALUE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static rx.RxReactiveStreams.toObservable; public class NettyToStyxResponsePropagatorTest { - private ByteBuf firstContentChunk = copiedBuffer("first chunk", UTF_8); - private ByteBuf secondContentChunk = copiedBuffer("second chunk", UTF_8); - private TestSubscriber responseSubscriber; + private static final String FIRST_CHUNK = "first chunk"; + private static final String SECOND_CHUNK = "second chunk"; + private ByteBuf firstContentChunk = copiedBuffer(FIRST_CHUNK, UTF_8); + private ByteBuf secondContentChunk = copiedBuffer(SECOND_CHUNK, UTF_8); + private FluxSink responseSubscriber; private DefaultHttpResponse httpResponseHeaders = new DefaultHttpResponse(HTTP_1_1, OK); private DefaultHttpContent httpContentOne = new DefaultHttpContent(firstContentChunk); private DefaultHttpContent httpContentTwo = new DefaultHttpContent(secondContentChunk); @@ -80,60 +78,39 @@ public class NettyToStyxResponsePropagatorTest { @BeforeEach public void setUp() { - responseSubscriber = new TestSubscriber<>(); + responseSubscriber = mock(FluxSink.class); } @Test public void notifiesSubscriberForNettyPipelineExceptions() { - Subscriber subscriber = mock(Subscriber.class); - NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(subscriber, SOME_ORIGIN); + NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(responseSubscriber, SOME_ORIGIN); EmbeddedChannel channel = new EmbeddedChannel(handler); channel.pipeline().fireExceptionCaught(new RuntimeException("Error")); ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); - verify(subscriber, times(1)).onError(captor.capture()); + verify(responseSubscriber, times(1)).error(captor.capture()); assertThat(captor.getValue(), is(instanceOf(BadHttpResponseException.class))); } @Test public void propagatesExceptionWhenThereIsDecodeErrorInReceivedResponse() throws Exception { - Subscriber subscriber = mock(Subscriber.class); - NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(subscriber, SOME_ORIGIN); + NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(responseSubscriber, SOME_ORIGIN); EmbeddedChannel channel = new EmbeddedChannel(handler); channel.writeInbound(newCorruptedResponse()); - verify(subscriber).onError(any(BadHttpResponseException.class)); + verify(responseSubscriber).error(any(BadHttpResponseException.class)); } @Test public void notifiesSubscriberWhenChannelBecomesInactive() throws Exception { - Subscriber subscriber = mock(Subscriber.class); - NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(subscriber, SOME_ORIGIN); - EmbeddedChannel channel = new EmbeddedChannel(handler); - - channel.pipeline().fireChannelInactive(); - - verify(subscriber).onError(any(TransportLostException.class)); - } - - @Test - public void ignoresChannelInactiveEventAfterResponseIsCompleted() throws Exception { NettyToStyxResponsePropagator handler = new NettyToStyxResponsePropagator(responseSubscriber, SOME_ORIGIN); EmbeddedChannel channel = new EmbeddedChannel(handler); - channel.writeInbound(httpResponseHeaders); - channel.writeInbound(newHttpContent("one")); - - LiveHttpResponse response = responseSubscriber.getOnNextEvents().get(0); - subscribeToContent(response); - - channel.writeInbound(EMPTY_LAST_CONTENT); - assertThat(responseSubscriber.getOnCompletedEvents().size(), is(1)); - channel.pipeline().fireChannelInactive(); - assertThat(responseSubscriber.getOnErrorEvents().size(), is(0)); + + verify(responseSubscriber).error(any(TransportLostException.class)); } @Test @@ -142,16 +119,17 @@ public void doesNotPropagateErrorsTwice() throws Exception { EmbeddedChannel channel = new EmbeddedChannel(handler); channel.writeInbound(httpResponseHeaders); - channel.writeInbound(newHttpContent("one")); - LiveHttpResponse response = responseSubscriber.getOnNextEvents().get(0); - subscribeToContent(response); + LiveHttpResponse response = verifyNextCalledOnResponseSubscriber(); - channel.pipeline().fireExceptionCaught(new RuntimeException("Simulated exception: something went horribly wrong!")); - assertThat(responseSubscriber.getOnErrorEvents().size(), is(1)); + StepVerifier.create(response.body()) + .then(channel::runPendingTasks) // Execute onSubscribe in FSM + .then(() -> channel.pipeline().fireExceptionCaught(new RuntimeException())) // Will emmit BadHttpResponseException + .then(() -> channel.pipeline().fireChannelInactive()) // Will emmit TransportLostException + .expectError(BadHttpResponseException.class) + .verify(); - channel.pipeline().fireChannelInactive(); - assertThat(responseSubscriber.getOnErrorEvents().size(), is(1)); + verify(responseSubscriber, atMostOnce()).error(any()); } @@ -159,41 +137,14 @@ public void doesNotPropagateErrorsTwice() throws Exception { public void handlesIdleStateEvent() throws Exception { EmbeddedChannel channel = new EmbeddedChannel(new NettyToStyxResponsePropagator(responseSubscriber, SOME_ORIGIN)); channel.writeInbound(httpResponseHeaders); - channel.writeInbound(newHttpContent("one")); - - channel.pipeline().fireUserEventTriggered(ALL_IDLE_STATE_EVENT); - - List errors = responseSubscriber.getOnErrorEvents(); - assertThat(errors, hasSize(1)); - assertThat(errors.get(0), is(instanceOf(ResponseTimeoutException.class))); - } - - - @Test - public void pushesHttpContentOnChannelReadComplete() { - EmbeddedChannel channel = new EmbeddedChannel(new NettyToStyxResponsePropagator(responseSubscriber, SOME_ORIGIN)); - - channel.writeInbound(httpResponseHeaders); - LiveHttpResponse response = onNextEvent(responseSubscriber, 0); - channel.runPendingTasks(); - - TestSubscriber contentSubscriber = subscribeToContent(response); - channel.runPendingTasks(); - HttpContent contentOne = newHttpContent("one"); - channel.writeInbound(contentOne); - channel.runPendingTasks(); + LiveHttpResponse response = verifyNextCalledOnResponseSubscriber(); - HttpContent contentTwo = newHttpContent("two"); - channel.writeInbound(contentTwo); - channel.runPendingTasks(); - - channel.pipeline().fireChannelReadComplete(); - channel.runPendingTasks(); - - assertNoErrors(contentSubscriber); - assertThat(asString(onNextEvent(contentSubscriber, 0)), is(asString(contentOne.content()))); - assertThat(asString(onNextEvent(contentSubscriber, 1)), is(asString(contentTwo.content()))); + StepVerifier.create(response.body()) + .then(channel::runPendingTasks) // Execute onSubscribe in FSM + .then(() -> channel.pipeline().fireUserEventTriggered(ALL_IDLE_STATE_EVENT)) + .expectError(ResponseTimeoutException.class) + .verify(); } @@ -204,17 +155,13 @@ public void pushesContentWhenObserverSubscribes() throws Exception { channel.writeInbound(httpContentOne); channel.writeInbound(httpContentTwo); - assertThat(onCompletedEvents(responseSubscriber), is(0)); - assertThat(onNextEvents(responseSubscriber), is(1)); - LiveHttpResponse response = onNextEvent(responseSubscriber, 0); - - TestSubscriber contentSubscriber = subscribeToContent(response); - channel.runPendingTasks(); + LiveHttpResponse response = verifyNextCalledOnResponseSubscriber(); - assertNoErrors(contentSubscriber); - assertThat(onNextEvents(contentSubscriber), is(2)); - assertThat(contentChunk(contentSubscriber, 0), is(asString(httpContentOne.content()))); - assertThat(contentChunk(contentSubscriber, 1), is(asString(httpContentTwo.content()))); + StepVerifier.create(response.body()) + .then(channel::runPendingTasks) + .assertNext(buf -> assertEquals(FIRST_CHUNK, new String(buf.content()))) + .assertNext(buf -> assertEquals(SECOND_CHUNK, new String(buf.content()))) + .thenCancel(); } @@ -226,9 +173,9 @@ public void doesNotCompleteResponseObservableIfContentHasNotBeenSubscribed() thr channel.writeInbound(httpContentTwo); channel.writeInbound(EMPTY_LAST_CONTENT); - assertNoErrors(responseSubscriber); - assertThat(onNextEvents(responseSubscriber), is(1)); - assertThat(onCompletedEvents(responseSubscriber), is(0)); + verifyNextCalledOnResponseSubscriber(); + verify(responseSubscriber, never()).error(any()); + verify(responseSubscriber, never()).complete(); } @Test @@ -238,12 +185,11 @@ public void completesContentObservableOnLastHttpContent() throws Exception { channel.writeInbound(httpResponseHeaders); channel.writeInbound(EMPTY_LAST_CONTENT); - LiveHttpResponse response = onNextEvent(responseSubscriber, 0); - TestSubscriber contentSubscriber = subscribeToContent(response); - channel.runPendingTasks(); + LiveHttpResponse response = verifyNextCalledOnResponseSubscriber(); - assertNoErrors(contentSubscriber); - assertThat(onCompletedEvents(contentSubscriber), is(1)); + StepVerifier.create(response.body()) + .then(channel::runPendingTasks) + .verifyComplete(); } @Test @@ -290,13 +236,13 @@ public void shouldReleaseAlreadyReadBufferInCaseOfError() throws Exception { } @Test - public void mapsOutOfDirectMemoryExceptionsToResourceExhaustedException() throws Exception { + public void mapsOutOfDirectMemoryExceptionsToStyxClientException() throws Exception { EmbeddedChannel channel = new EmbeddedChannel(new NettyToStyxResponsePropagator(responseSubscriber, SOME_ORIGIN)); channel.writeInbound(new DefaultHttpResponse(HTTP_1_1, OK)); channel.pipeline().fireExceptionCaught(newOutOfDirectMemoryError("Simulated out of direct memory error in a test.")); - assertThat(responseSubscriber.getOnErrorEvents().get(0), is(instanceOf(StyxClientException.class))); + verify(responseSubscriber).error(any(StyxClientException.class)); } private OutOfDirectMemoryError newOutOfDirectMemoryError(String message) throws IllegalAccessException, InvocationTargetException, InstantiationException { @@ -314,7 +260,8 @@ private OutOfDirectMemoryError newOutOfDirectMemoryError(String message) throws @Test public void shouldReleaseAlreadyReadBufferInCaseOfChannelGetsInactive() throws Exception { - EmbeddedChannel channel = new EmbeddedChannel(new NettyToStyxResponsePropagator(responseSubscriber, SOME_ORIGIN)); + FluxSink subscriber = mock(FluxSink.class); + EmbeddedChannel channel = new EmbeddedChannel(new NettyToStyxResponsePropagator(subscriber, SOME_ORIGIN)); channel.writeInbound(new DefaultHttpResponse(HTTP_1_1, OK)); HttpContent httpContentOne = newHttpContent("first chunk"); @@ -367,48 +314,9 @@ private static HttpContent newHttpContent(String content) { return new DefaultHttpContent(copiedBuffer(content, UTF_8)); } - - private TestSubscriber subscribeToContent(LiveHttpResponse response) { - TestSubscriber contentSubscriber = new TestSubscriber<>(MAX_VALUE); - toObservable(response.body()) - .map(Buffers::toByteBuf) - .subscribe(contentSubscriber); - return contentSubscriber; - } - - private void assertNoErrors(TestSubscriber subscriber) { - String message = "\nExpected no errors, but got at least: \n"; - if (subscriber.getOnErrorEvents().size() > 0) { - message = message + stackTrace((Throwable) subscriber.getOnErrorEvents().get(0)); - } - assertThat(message, subscriber.getOnErrorEvents().size(), is(0)); + private LiveHttpResponse verifyNextCalledOnResponseSubscriber() { + ArgumentCaptor responseArg = ArgumentCaptor.forClass(LiveHttpResponse.class); + verify(responseSubscriber).next(responseArg.capture()); + return responseArg.getValue(); } - - private String stackTrace(Throwable e) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - e.printStackTrace(pw); - return sw.toString(); - } - - private int onNextEvents(TestSubscriber subscriber) { - return subscriber.getOnNextEvents().size(); - } - - private int onCompletedEvents(TestSubscriber subscriber) { - return subscriber.getOnCompletedEvents().size(); - } - - private T onNextEvent(TestSubscriber subscriber, int i) { - return subscriber.getOnNextEvents().get(i); - } - - private String contentChunk(TestSubscriber contentSubscriber, int i) { - return asString(onNextEvent(contentSubscriber, i)); - } - - private String asString(ByteBuf firstContentChunk) { - return firstContentChunk.toString(UTF_8); - } - } diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java index ac2565c28a..422fbad169 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/RequestsToOriginMetricsCollectorTest.java @@ -36,7 +36,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import rx.Subscriber; +import reactor.core.publisher.FluxSink; import java.util.List; import java.util.Optional; @@ -119,7 +119,7 @@ private EmbeddedChannel buildEmbeddedChannel() { return new EmbeddedChannel( new HttpClientCodec(), new RequestsToOriginMetricsCollector(originMetrics), - new NettyToStyxResponsePropagator(mock(Subscriber.class), this.origin) + new NettyToStyxResponsePropagator(mock(FluxSink.class), this.origin) ); } From b5c714ed8361092d49fdbf0fe25806649c26fb58 Mon Sep 17 00:00:00 2001 From: OwenLindsell Date: Fri, 22 Nov 2019 10:27:32 +0000 Subject: [PATCH 03/12] committing broken code to share with Mikko --- .../scala/com/hotels/styx/server/ServerConnectionsSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala index 806ea0fc01..14a15d89fa 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala @@ -79,6 +79,7 @@ class ServerConnectionsSpec extends FunSpec }) eventually(timeout(1 second)) { + // @Mikko, this is the test that is failing getTotalConnectionsMetric.bodyAs(UTF_8) should be("{\"connections.total-connections\":{\"count\":0}}") } } From 487a8f72d6c572b782b7b62a9807260be4a7ace9 Mon Sep 17 00:00:00 2001 From: Mikko Karjalainen Date: Mon, 18 Nov 2019 15:49:23 +0000 Subject: [PATCH 04/12] Add informational message at the end of health checks. (#532) - and change its level to DEBUG. --- .../hotels/styx/services/HealthCheckMonitoringService.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/components/proxy/src/main/kotlin/com/hotels/styx/services/HealthCheckMonitoringService.kt b/components/proxy/src/main/kotlin/com/hotels/styx/services/HealthCheckMonitoringService.kt index 5fa87fcadf..91f873410f 100644 --- a/components/proxy/src/main/kotlin/com/hotels/styx/services/HealthCheckMonitoringService.kt +++ b/components/proxy/src/main/kotlin/com/hotels/styx/services/HealthCheckMonitoringService.kt @@ -125,7 +125,11 @@ internal class HealthCheckMonitoringService( .flatMap { it } .collectList() .subscribe { - LOGGER.info("Health Check Completed ..") + val details = it.joinToString(", ") { (name, _, health2) -> + "{ app: $application, host: $name, result: $health2 }" + } + + LOGGER.debug("Health Check Completed: ${details}") } } From a1bbc30d9d5b5b379c8e10ea3605edac7fe351b9 Mon Sep 17 00:00:00 2001 From: Mikko Karjalainen Date: Tue, 19 Nov 2019 21:05:33 +0000 Subject: [PATCH 05/12] Common test result reporter for Styx Kotlin projects. (#533) --- .../FlowControllingHttpContentProducer.java | 2 +- .../io/kotlintest/provided/ProjectConfig.kt | 24 ++++++++++ support/api-testsupport/pom.xml | 48 +++++++++++++++++++ .../hotels/styx/support/TestResultReporter.kt | 27 +++++++---- .../io/kotlintest/provided/ProjectConfig.kt | 4 +- 5 files changed, 92 insertions(+), 13 deletions(-) create mode 100644 components/proxy/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt rename system-tests/ft-suite/src/test/kotlin/com/hotels/styx/support/TestErrorReporter.kt => support/api-testsupport/src/main/kotlin/com/hotels/styx/support/TestResultReporter.kt (66%) diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java index c63f04a7d6..451fedefeb 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java @@ -151,7 +151,7 @@ enum ProducerState { */ private ProducerState rxBackpressureRequestInBuffering(RxBackpressureRequestEvent event) { // This can occur before the actual content subscribe event. This occurs if the subscriber - // has called request() before actually having subscribed to the content observable. In this + // has called request() before having subscribed to the content observable. In this // case just initialise the request count with requested N value. requested.compareAndSet(Long.MAX_VALUE, 0); getAndAddRequest(requested, event.n()); diff --git a/components/proxy/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt b/components/proxy/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt new file mode 100644 index 0000000000..1bbd905334 --- /dev/null +++ b/components/proxy/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt @@ -0,0 +1,24 @@ +/* + Copyright (C) 2013-2019 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package io.kotlintest.provided + +import com.hotels.styx.support.TestResultReporter +import io.kotlintest.AbstractProjectConfig +import io.kotlintest.extensions.TestListener + +class ProjectConfig: AbstractProjectConfig() { + override fun listeners(): List = listOf(TestResultReporter) +} diff --git a/support/api-testsupport/pom.xml b/support/api-testsupport/pom.xml index 8d6670aceb..cbfec60ad6 100755 --- a/support/api-testsupport/pom.xml +++ b/support/api-testsupport/pom.xml @@ -33,6 +33,17 @@ styx-common + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + + + io.kotlintest + kotlintest-runner-junit5 + compile + + io.reactivex rxjava @@ -64,6 +75,43 @@ + + + org.jetbrains.kotlin + kotlin-maven-plugin + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-kotlin-sources + generate-sources + + add-source + + + + src/main/kotlin + + + + + add-kotlin-test-sources + generate-test-sources + + add-test-source + + + + src/test/kotlin + + + + + + org.apache.maven.plugins maven-deploy-plugin diff --git a/system-tests/ft-suite/src/test/kotlin/com/hotels/styx/support/TestErrorReporter.kt b/support/api-testsupport/src/main/kotlin/com/hotels/styx/support/TestResultReporter.kt similarity index 66% rename from system-tests/ft-suite/src/test/kotlin/com/hotels/styx/support/TestErrorReporter.kt rename to support/api-testsupport/src/main/kotlin/com/hotels/styx/support/TestResultReporter.kt index bc623c9ca2..f67a531f57 100644 --- a/system-tests/ft-suite/src/test/kotlin/com/hotels/styx/support/TestErrorReporter.kt +++ b/support/api-testsupport/src/main/kotlin/com/hotels/styx/support/TestResultReporter.kt @@ -21,45 +21,52 @@ import io.kotlintest.TestResult import io.kotlintest.TestStatus import io.kotlintest.extensions.TestListener import org.slf4j.LoggerFactory +import java.io.PrintWriter +import java.io.StringWriter -object TestErrorReporter: TestListener { - val LOGGER = LoggerFactory.getLogger("StyxFT") +object TestResultReporter : TestListener { + val LOGGER = LoggerFactory.getLogger("Styx-Tests") override fun beforeSpec(spec: Spec) { super.beforeSpec(spec) - LOGGER.info("Starting ${spec.description().fullName()}") + LOGGER.info("Starting: ${spec.description().fullName()}") } override fun afterSpec(spec: Spec) { - LOGGER.info("Finished ${spec.description().fullName()}") + LOGGER.info("Finished: ${spec.description().fullName()}") super.afterSpec(spec) } override fun beforeTest(testCase: TestCase) { super.beforeTest(testCase) - LOGGER.info("Running '${testCase.name}' - ${testCase.source.fileName}:${testCase.source.lineNumber}") + LOGGER.info("Running: '${testCase.name}' - ${testCase.source.fileName}:${testCase.source.lineNumber}") } override fun afterTest(testCase: TestCase, result: TestResult) { super.afterTest(testCase, result) - LOGGER.info("${testCase.name} - ${result.status}") + LOGGER.info("Result: ${testCase.name} - ${result.status}") when (result.status) { - TestStatus.Success -> { } - TestStatus.Ignored -> { } TestStatus.Error -> { result.error?.let { LOGGER.info(it.message) - it.printStackTrace() + LOGGER.info(it.stackTrace()) } } TestStatus.Failure -> { result.error?.let { LOGGER.info(it.message) - it.printStackTrace() + LOGGER.info(it.stackTrace()) } } + else -> { } } } + + private fun Throwable.stackTrace() = StringWriter() + .let { + this.printStackTrace(PrintWriter(it)) + it.toString() + } } \ No newline at end of file diff --git a/system-tests/ft-suite/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt b/system-tests/ft-suite/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt index 799f43af2b..1bbd905334 100644 --- a/system-tests/ft-suite/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt +++ b/system-tests/ft-suite/src/test/kotlin/io/kotlintest/provided/ProjectConfig.kt @@ -15,10 +15,10 @@ */ package io.kotlintest.provided -import com.hotels.styx.support.TestErrorReporter +import com.hotels.styx.support.TestResultReporter import io.kotlintest.AbstractProjectConfig import io.kotlintest.extensions.TestListener class ProjectConfig: AbstractProjectConfig() { - override fun listeners(): List = listOf(TestErrorReporter) + override fun listeners(): List = listOf(TestResultReporter) } From 9e74ef2b14718ccc7789e734f24db526896ddabe Mon Sep 17 00:00:00 2001 From: Mikko Karjalainen Date: Wed, 20 Nov 2019 09:25:15 +0000 Subject: [PATCH 06/12] Don't deploy styx-api-testsupport jars, nor styx-support pom. (#534) --- support/api-testsupport/pom.xml | 2 +- support/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/support/api-testsupport/pom.xml b/support/api-testsupport/pom.xml index cbfec60ad6..6e380ff342 100755 --- a/support/api-testsupport/pom.xml +++ b/support/api-testsupport/pom.xml @@ -116,7 +116,7 @@ org.apache.maven.plugins maven-deploy-plugin - false + true diff --git a/support/pom.xml b/support/pom.xml index a75ceae3b5..d8594487f2 100644 --- a/support/pom.xml +++ b/support/pom.xml @@ -27,7 +27,7 @@ org.apache.maven.plugins maven-deploy-plugin - false + true From 8fdd81d930deb83e8b75eb301e7d1fbd92d8e5d5 Mon Sep 17 00:00:00 2001 From: Robert Macaulay Date: Mon, 25 Nov 2019 01:36:01 -0600 Subject: [PATCH 07/12] Reduce exception construction (#538) Create an exception lazily only when necessary. --- .../hotels/styx/client/connectionpool/SimpleConnectionPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java index 4239249039..314ebfbbcd 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java @@ -91,7 +91,7 @@ public Publisher borrowConnection() { } }).timeout( Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()), - Mono.error(new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis()))); + Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis()))); } private void newConnection() { From 17d84515e074461540d46dd865b5385809240cf7 Mon Sep 17 00:00:00 2001 From: Owen Lindsell Date: Tue, 26 Nov 2019 11:56:17 +0000 Subject: [PATCH 08/12] fixed to run both java and kotlin tests together (#540) * Fixed Java unit tests so they run along with Kotlin tests in the components/proxy module * Removed tests containing System.exit(); as Surefire does not support tests calling System.exit(). --- .../SanitisedHttpMessageFormatterTest.java | 15 +++---- components/proxy/pom.xml | 37 +++------------- .../com/hotels/styx/StyxPipelineFactory.java | 2 +- .../java/com/hotels/styx/StyxServerTest.java | 44 +------------------ .../yaml/YamlApplicationsProviderTest.java | 8 ++-- .../styx/proxy/BackendServicesRouterTest.java | 2 + .../LoadBalancerFactoryProviderTest.java | 4 +- .../handlers/StandardHttpPipelineTest.java | 8 ++-- .../PluginLoadingForStartupTest.java | 8 ++-- pom.xml | 1 - support/api-testsupport/pom.xml | 31 ------------- system-tests/ft-suite/pom.xml | 6 +++ 12 files changed, 37 insertions(+), 129 deletions(-) diff --git a/components/common/src/test/java/com/hotels/styx/common/format/SanitisedHttpMessageFormatterTest.java b/components/common/src/test/java/com/hotels/styx/common/format/SanitisedHttpMessageFormatterTest.java index ae6d242c22..fcff5fb49f 100644 --- a/components/common/src/test/java/com/hotels/styx/common/format/SanitisedHttpMessageFormatterTest.java +++ b/components/common/src/test/java/com/hotels/styx/common/format/SanitisedHttpMessageFormatterTest.java @@ -25,6 +25,8 @@ import static com.hotels.styx.api.HttpRequest.get; import static com.hotels.styx.api.HttpVersion.HTTP_1_1; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.matchesPattern; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; import static org.mockito.Matchers.any; @@ -62,30 +64,25 @@ public void setup() { @Test public void shouldFormatHttpRequest() { String formattedRequest = sanitisedHttpMessageFormatter.formatRequest(httpRequest); - assertMatchesRegex(formattedRequest, HTTP_REQUEST_PATTERN); + assertThat(formattedRequest, matchesPattern(HTTP_REQUEST_PATTERN)); } @Test public void shouldFormatLiveHttpRequest() { String formattedRequest = sanitisedHttpMessageFormatter.formatRequest(httpRequest.stream()); - assertMatchesRegex(formattedRequest, HTTP_REQUEST_PATTERN); + assertThat(formattedRequest, matchesPattern(HTTP_REQUEST_PATTERN)); } @Test public void shouldFormatHttpResponse() { String formattedResponse = sanitisedHttpMessageFormatter.formatResponse(httpResponse); - assertMatchesRegex(formattedResponse, HTTP_RESPONSE_PATTERN); + assertThat(formattedResponse, matchesPattern(HTTP_RESPONSE_PATTERN)); } @Test public void shouldFormatLiveHttpResponse() { String formattedResponse = sanitisedHttpMessageFormatter.formatResponse(httpResponse.stream()); - assertMatchesRegex(formattedResponse, HTTP_RESPONSE_PATTERN); - } - - private void assertMatchesRegex(String actual, String expected) { - assertTrue(actual.matches(expected), - "\n\nPattern to match: " + expected + "\nActual result: " + actual + "\n\n"); + assertThat(formattedResponse, matchesPattern(HTTP_RESPONSE_PATTERN)); } } \ No newline at end of file diff --git a/components/proxy/pom.xml b/components/proxy/pom.xml index 46a6e3d76a..e7495a7b15 100644 --- a/components/proxy/pom.xml +++ b/components/proxy/pom.xml @@ -117,6 +117,12 @@ junit-jupiter + + org.junit.platform + junit-platform-launcher + 1.5.2 + + org.hamcrest hamcrest @@ -162,7 +168,6 @@ io.kotlintest kotlintest-runner-junit5 - test @@ -213,36 +218,6 @@ maven-surefire-plugin - - org.codehaus.mojo - build-helper-maven-plugin - - - add-kotlin-sources - generate-sources - - add-source - - - - src/main/kotlin - - - - - add-kotlin-test-sources - generate-test-sources - - add-test-source - - - - src/test/kotlin - - - - - diff --git a/components/proxy/src/main/java/com/hotels/styx/StyxPipelineFactory.java b/components/proxy/src/main/java/com/hotels/styx/StyxPipelineFactory.java index 6cdcd9e135..ecd22159be 100644 --- a/components/proxy/src/main/java/com/hotels/styx/StyxPipelineFactory.java +++ b/components/proxy/src/main/java/com/hotels/styx/StyxPipelineFactory.java @@ -42,7 +42,7 @@ /** * Produces the pipeline for the Styx proxy server. */ -public final class StyxPipelineFactory { +public class StyxPipelineFactory { private final RoutingObjectFactory.Context builtinRoutingObjects; private final Environment environment; diff --git a/components/proxy/src/test/java/com/hotels/styx/StyxServerTest.java b/components/proxy/src/test/java/com/hotels/styx/StyxServerTest.java index 5be016f5da..bb6b559356 100644 --- a/components/proxy/src/test/java/com/hotels/styx/StyxServerTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/StyxServerTest.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -177,12 +178,6 @@ public void allPluginsAreStartedEvenIfSomeFail() { } } - @Test - public void systemExitIsCalledWhenCreateStyxServerFails() { - Runtime runtime = captureSystemExit(() -> StyxServer.main(new String[0])); - verify(runtime).exit(1); - } - @Test public void serverDoesNotStartIfServiceFails() { StyxServer styxServer = null; @@ -211,24 +206,6 @@ public void startsFromMain() { } } - @Test - public void logsExceptionWhenConfigurationIsInvalid() { - try { - setProperty("STYX_HOME", fixturesHome()); - setProperty("CONFIG_FILE_LOCATION", Paths.get(fixturesHome()).resolve("conf/invalid.yml").toString()); - - Runtime runtime = captureSystemExit(() -> StyxServer.main(new String[0])); - verify(runtime).exit(2); - - eventually(() -> assertThat(log.log(), hasItem(loggingEvent(ERROR, - "Styx server failed to start due to configuration error in file .+: Missing a mandatory field 'proxy'" - )))); - } finally { - clearProperty("STYX_HOME"); - clearProperty("CONFIG_FILE_LOCATION"); - } - } - private static StyxService registryThatFailsToStart() { Registry registry = mock(Registry.class); when(registry.get()).thenReturn(emptyList()); @@ -320,23 +297,4 @@ private static void eventually(Runnable block) { throw new AssertionError("Eventually block did not complete in 3 seconds.", lastError); } - private static Runtime captureSystemExit(Runnable block) { - try { - Runtime originalRuntime = Runtime.getRuntime(); - Field runtimeField = Runtime.class.getDeclaredField("currentRuntime"); - Runtime mockRuntime = mock(Runtime.class); - try { - runtimeField.setAccessible(true); - runtimeField.set(Runtime.class, mockRuntime); - block.run(); - } finally { - runtimeField.set(Runtime.class, originalRuntime); - runtimeField.setAccessible(false); - } - - return mockRuntime; - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } } diff --git a/components/proxy/src/test/java/com/hotels/styx/applications/yaml/YamlApplicationsProviderTest.java b/components/proxy/src/test/java/com/hotels/styx/applications/yaml/YamlApplicationsProviderTest.java index 1313b633b9..e6392c495f 100644 --- a/components/proxy/src/test/java/com/hotels/styx/applications/yaml/YamlApplicationsProviderTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/applications/yaml/YamlApplicationsProviderTest.java @@ -210,21 +210,21 @@ public void stickySessionEnabledWhenYamlStickySessionEnabledIsTrue() { public void cannotLoadWithNoApplications() throws IOException { Exception e = assertThrows(Exception.class, () -> loadFromPath("classpath:/conf/origins/empty-origins-for-configtest.yml")); - assertEquals("Invalid YAML from classpath:conf/origins/empty-origins-for-configtest.yml: No content to map due to end-of-input\n at \\[Source: .*\\]", e.getMessage()); + assertThat(e.getMessage(), matchesPattern("Invalid YAML from classpath:conf/origins/empty-origins-for-configtest.yml: No content to map due to end-of-input\n at \\[Source: .*\\]")); } @Test public void doesNotLoadIfFileDoesNotExist() { - Exception e = assertThrows(DslFunctionResolutionError.class, + Exception e = assertThrows(RuntimeException.class, () -> loadFromPath("/sadiusadasd")); assertThat(e.getMessage(), matchesPattern("Unable to load YAML from.*")); } @Test public void cannotLoadWithSyntaxErrors() throws IOException { - Exception e = assertThrows(DslFunctionResolutionError.class, + Exception e = assertThrows(RuntimeException.class, () -> loadFromPath("classpath:/conf/origins/origins-with-syntax-error-for-configtest.yml")); - assertEquals("Invalid YAML from classpath:conf/origins/origins-with-syntax-error-for-configtest.yml: Cannot deserialize instance of `java.util.ArrayList` out of VALUE_STRING token\n at \\[Source: .*\\]", e.getMessage()); + assertThat(e.getMessage(), matchesPattern("Invalid YAML from classpath:conf/origins/origins-with-syntax-error-for-configtest.yml: Cannot deserialize instance of `java.util.ArrayList` out of VALUE_STRING token\n at \\[Source: .*\\]")); } private static BackendService applicationFor(YamlApplicationsProvider provider, String appName) { diff --git a/components/proxy/src/test/java/com/hotels/styx/proxy/BackendServicesRouterTest.java b/components/proxy/src/test/java/com/hotels/styx/proxy/BackendServicesRouterTest.java index d8e7f84e9a..e618405f76 100644 --- a/components/proxy/src/test/java/com/hotels/styx/proxy/BackendServicesRouterTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/proxy/BackendServicesRouterTest.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.mockito.ArgumentCaptor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -58,6 +59,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class BackendServicesRouterTest { private static final String APP_A = "appA"; private static final String APP_B = "appB"; diff --git a/components/proxy/src/test/java/com/hotels/styx/proxy/LoadBalancerFactoryProviderTest.java b/components/proxy/src/test/java/com/hotels/styx/proxy/LoadBalancerFactoryProviderTest.java index f2d1409f9e..f9a6cafb10 100644 --- a/components/proxy/src/test/java/com/hotels/styx/proxy/LoadBalancerFactoryProviderTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/proxy/LoadBalancerFactoryProviderTest.java @@ -64,7 +64,7 @@ public void errorsIfCannotFindTheFactoryClass() { Configuration configurations = new YamlConfig(yaml); assertThrows(RuntimeException.class, - () -> newProvider(configurations)); + () -> newProvider(configurations).get()); } @Test @@ -76,7 +76,7 @@ public void errorsIfTheSpecifiedStrategyFactoryKeyClassIsMissing() { Configuration configurations = new YamlConfig(yaml); assertThrows(MissingConfigurationException.class, - () -> newProvider(configurations)); + () -> newProvider(configurations).get()); } @Test diff --git a/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java b/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java index e169599a94..fbf4868620 100644 --- a/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/routing/handlers/StandardHttpPipelineTest.java @@ -22,7 +22,9 @@ import com.hotels.styx.server.HttpInterceptorContext; import com.hotels.styx.server.track.RequestTracker; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import reactor.core.publisher.Mono; import java.net.InetSocketAddress; @@ -168,7 +170,8 @@ public void sendsExceptionUponMultipleSubscription() { () -> toObservable(responseObservable).toBlocking().first()); } - @Test + @ParameterizedTest + @MethodSource("multipleSubscriptionInterceptors") public void sendsExceptionUponExtraSubscriptionInsideInterceptor(HttpInterceptor interceptor) throws Exception { HttpHandler handler = (request, context) -> Eventual.of(response(OK).build()); @@ -176,9 +179,8 @@ public void sendsExceptionUponExtraSubscriptionInsideInterceptor(HttpInterceptor StandardHttpPipeline pipeline = new StandardHttpPipeline(interceptors, handler, RequestTracker.NO_OP); Eventual responseObservable = pipeline.handle(get("/").build(), HttpInterceptorContext.create()); - Exception e = assertThrows(IllegalStateException.class, + assertThrows(IllegalStateException.class, () -> toObservable(responseObservable).toBlocking().first()); - assertEquals("multipleSubscriptionInterceptors", e.getMessage()); } private static Stream multipleSubscriptionInterceptors() { diff --git a/components/proxy/src/test/java/com/hotels/styx/startup/extensions/PluginLoadingForStartupTest.java b/components/proxy/src/test/java/com/hotels/styx/startup/extensions/PluginLoadingForStartupTest.java index 89964fc3e2..e447376ef6 100644 --- a/components/proxy/src/test/java/com/hotels/styx/startup/extensions/PluginLoadingForStartupTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/startup/extensions/PluginLoadingForStartupTest.java @@ -229,10 +229,10 @@ public void attemptsToLoadAllPluginsEvenIfSomePluginFactoriesCannotBeLoaded() { " classPath: " + FIXTURES_CLASS_PATH + "\n"; Exception e = assertThrows(RuntimeException.class, () -> PluginLoadingForStartup.loadPlugins(environment(yaml))); - assertEquals("3 plugin\\(s\\) could not be loaded: failedPlugins=\\[myPlugin1, myPlugin2, myPlugin3\\]; failureCauses=\\[" + + assertThat(e.getMessage(), matchesPattern("3 plugin\\(s\\) could not be loaded: failedPlugins=\\[myPlugin1, myPlugin2, myPlugin3\\]; failureCauses=\\[" + "myPlugin1: com.hotels.styx.api.configuration.ConfigurationException: Could not load a plugin factory.*, " + "myPlugin2: com.hotels.styx.api.configuration.ConfigurationException: Could not load a plugin factory.*, " + - "myPlugin3: com.hotels.styx.api.configuration.ConfigurationException: Could not load a plugin factory.*\\]", e.getMessage()); + "myPlugin3: com.hotels.styx.api.configuration.ConfigurationException: Could not load a plugin factory.*\\]")); assertThat(log.log(), hasItem(loggingEvent(ERROR, "Could not load plugin: pluginName=myPlugin1; factoryClass=.*", ConfigurationException.class, "Could not load a plugin factory for.*"))); log.stop(); } @@ -259,10 +259,10 @@ public void attemptsToLoadAllPluginsEvenIfSomePluginFactoriesFailDuringExecution " classPath: " + FIXTURES_CLASS_PATH + "\n"; Exception e = assertThrows(RuntimeException.class, () -> PluginLoadingForStartup.loadPlugins(environment(yaml))); - assertEquals( "3 plugin\\(s\\) could not be loaded: failedPlugins=\\[myPlugin1, myPlugin2, myPlugin3\\]; failureCauses=\\[" + + assertThat(e.getMessage(), matchesPattern( "3 plugin\\(s\\) could not be loaded: failedPlugins=\\[myPlugin1, myPlugin2, myPlugin3\\]; failureCauses=\\[" + "myPlugin1: java.lang.RuntimeException: plugin factory error, " + "myPlugin2: java.lang.RuntimeException: plugin factory error, " + - "myPlugin3: java.lang.RuntimeException: plugin factory error\\]", e.getMessage()); + "myPlugin3: java.lang.RuntimeException: plugin factory error\\]")); assertThat(log.log(), hasItem(loggingEvent(ERROR, "Could not load plugin: pluginName=myPlugin1; factoryClass=.*", RuntimeException.class, "plugin factory error"))); log.stop(); diff --git a/pom.xml b/pom.xml index 8367383689..f3f6946997 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,6 @@ 2.2 3.1.0 5.5.2 - 3.0.5 1.14.0 0.8.0 diff --git a/support/api-testsupport/pom.xml b/support/api-testsupport/pom.xml index 6e380ff342..143cd3f1ac 100755 --- a/support/api-testsupport/pom.xml +++ b/support/api-testsupport/pom.xml @@ -81,37 +81,6 @@ kotlin-maven-plugin - - org.codehaus.mojo - build-helper-maven-plugin - - - add-kotlin-sources - generate-sources - - add-source - - - - src/main/kotlin - - - - - add-kotlin-test-sources - generate-test-sources - - add-test-source - - - - src/test/kotlin - - - - - - org.apache.maven.plugins maven-deploy-plugin diff --git a/system-tests/ft-suite/pom.xml b/system-tests/ft-suite/pom.xml index 96a8fb0202..3eaa56e6dc 100644 --- a/system-tests/ft-suite/pom.xml +++ b/system-tests/ft-suite/pom.xml @@ -27,6 +27,12 @@ com.hotels.styx styx-proxy + + + org.junit.platform + junit-platform-launcher + + From 44781c30538bdfe8a683393ba79d843e0b4b2f25 Mon Sep 17 00:00:00 2001 From: OwenLindsell Date: Tue, 26 Nov 2019 15:57:30 +0000 Subject: [PATCH 09/12] Implemented code to terminate connection in the same way the old rxJava worked --- .../hotels/styx/client/StyxHttpClient.java | 26 ++++++++----------- .../FlowControllingHttpContentProducer.java | 2 +- .../connectionpool/HttpRequestOperation.java | 7 +---- .../NettyToStyxResponsePropagator.java | 2 -- .../styx/server/ServerConnectionsSpec.scala | 1 - 5 files changed, 13 insertions(+), 25 deletions(-) diff --git a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java index 3293d59d71..5099b2c0ce 100644 --- a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java +++ b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java @@ -21,13 +21,13 @@ import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; +import com.hotels.styx.api.ResponseEventListener; import com.hotels.styx.api.Url; import com.hotels.styx.api.extension.Origin; import com.hotels.styx.api.extension.service.TlsSettings; import com.hotels.styx.client.netty.connectionpool.NettyConnectionFactory; import com.hotels.styx.client.ssl.SslContextFactory; import io.netty.handler.ssl.SslContext; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Optional; @@ -118,28 +118,24 @@ static Mono sendRequestInternal(NettyConnectionFactory connect SslContext sslContext = getSslContext(params.https(), params.tlsSettings()); - Mono responseObservable = connectionFactory.createConnection( + return connectionFactory.createConnection( origin, new ConnectionSettings(params.connectTimeoutMillis()), sslContext ).flatMap(connection -> Mono.from(connection.write(networkRequest) - .doOnTerminate(() -> { - // @Mikko, In the new code this gets called for each of the ctx.close() calls in ExcessConnectionRejector. However, it - // does not get called when the streams complete - I think because they are cancelled, not completed. - // In the old rxJava code, this gets called 10 times to close all 10 connections. I think this is - // because the streams complete successfully. You will see a call made to FlowControllingHttpContentProducer.contentEndEventWhileStreaming - // in the old code, but not in the new code. - System.out.println("****TERMINATING: " + count.incrementAndGet()); - connection.close(); - }) + .doOnComplete(connection::close) + .doOnError(e -> connection.close()) + .map(response -> response.newBuilder() + .body(it -> + it.doOnEnd(x -> connection.close()) + .doOnCancel(() -> connection.close()) + ) + .build() + ) ) ); - - return responseObservable; - } - static AtomicInteger count = new AtomicInteger(); private static LiveHttpRequest addUserAgent(String userAgent, LiveHttpRequest request) { if (userAgent != null) { diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java index 451fedefeb..3ad3f9e76a 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/FlowControllingHttpContentProducer.java @@ -296,7 +296,7 @@ private ProducerState contentEndEventWhileStreaming(ContentEndEvent event) { if (readQueue.size() > 0) { return EMITTING_BUFFERED_CONTENT; } else { - this.contentSubscriber.onCompleted(); // @Mikko, this get called in the old code, but not the new code + this.contentSubscriber.onCompleted(); this.onCompleteAction.run(); return COMPLETED; } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java index ca31645e2e..5bcfcc8f00 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java @@ -22,6 +22,7 @@ import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.Requests; +import com.hotels.styx.api.ResponseEventListener; import com.hotels.styx.api.exceptions.TransportLostException; import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.OriginStatsFactory; @@ -148,12 +149,6 @@ public Flux execute(NettyConnection nettyConnection) { } }); - responseFlux = responseFlux.doOnCancel(() -> { - // @Mikko, In the old rxJava code the equivalent of doOnCancel is doOnUnsubscribe, which would also get called here. - // So maybe the problem isn't that it's being cancelled, because the same behaviour is in the old and new code. - System.out.println("****Cancelling"); - }); - if (requestLoggingEnabled) { responseFlux = responseFlux .doOnNext(response -> { diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java index d6ca1baa78..717fcdafe6 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java @@ -204,8 +204,6 @@ private FlowControllingHttpContentProducer createProducer(ChannelHandlerContext private void emitResponseCompleted() { if (responseCompleted.compareAndSet(false, true)) { - // @Mikko, Sink seems to always be cancelled. I think this is what is causing the doOnTerminate (line 127 in StyxHttpClient) not to be executed - System.out.println("Sink cancelled state: " + sink.isCancelled()); sink.complete(); } } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala index 14a15d89fa..806ea0fc01 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/server/ServerConnectionsSpec.scala @@ -79,7 +79,6 @@ class ServerConnectionsSpec extends FunSpec }) eventually(timeout(1 second)) { - // @Mikko, this is the test that is failing getTotalConnectionsMetric.bodyAs(UTF_8) should be("{\"connections.total-connections\":{\"count\":0}}") } } From ed17f3930df3ad24fd699bfb6708f00b0c169977 Mon Sep 17 00:00:00 2001 From: OwenLindsell Date: Wed, 27 Nov 2019 08:06:43 +0000 Subject: [PATCH 10/12] removed unused imports --- .../client/src/main/java/com/hotels/styx/client/Connection.java | 1 - .../src/main/java/com/hotels/styx/client/StyxHttpClient.java | 2 -- .../hotels/styx/client/connectionpool/ExpiringConnection.java | 1 - .../styx/client/connectionpool/stubs/StubConnectionFactory.java | 1 - .../styx/client/netty/connectionpool/HttpRequestOperation.java | 2 -- .../styx/client/netty/connectionpool/NettyConnection.java | 1 - 6 files changed, 8 deletions(-) diff --git a/components/client/src/main/java/com/hotels/styx/client/Connection.java b/components/client/src/main/java/com/hotels/styx/client/Connection.java index f80eb652c2..71a1f82ce8 100644 --- a/components/client/src/main/java/com/hotels/styx/client/Connection.java +++ b/components/client/src/main/java/com/hotels/styx/client/Connection.java @@ -18,7 +18,6 @@ import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.extension.Origin; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java index 5099b2c0ce..cb19167fa4 100644 --- a/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java +++ b/components/client/src/main/java/com/hotels/styx/client/StyxHttpClient.java @@ -21,7 +21,6 @@ import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; -import com.hotels.styx.api.ResponseEventListener; import com.hotels.styx.api.Url; import com.hotels.styx.api.extension.Origin; import com.hotels.styx.api.extension.service.TlsSettings; @@ -33,7 +32,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.base.Preconditions.checkArgument; import static com.hotels.styx.api.HttpHeaderNames.HOST; diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java index 4d71742dc4..5399a195ab 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/ExpiringConnection.java @@ -21,7 +21,6 @@ import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.Connection; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import java.util.function.Supplier; diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java index 46b699cf5c..2ebb1fcd7d 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/stubs/StubConnectionFactory.java @@ -21,7 +21,6 @@ import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.Connection; import com.hotels.styx.client.ConnectionSettings; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java index 5bcfcc8f00..726177246e 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java @@ -22,7 +22,6 @@ import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.Requests; -import com.hotels.styx.api.ResponseEventListener; import com.hotels.styx.api.exceptions.TransportLostException; import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.OriginStatsFactory; @@ -38,7 +37,6 @@ import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.timeout.IdleStateHandler; -import org.reactivestreams.Publisher; import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java index 12b3b5287e..52ba1179ae 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyConnection.java @@ -29,7 +29,6 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import java.util.Optional; From 02e07fc86fa361b2ea2a82ade566410b6439a721 Mon Sep 17 00:00:00 2001 From: Mikko Karjalainen Date: Wed, 27 Nov 2019 14:57:23 +0000 Subject: [PATCH 11/12] Fix issue: #525: A race condition in styx object store. (#537) --- .../styx/api/configuration/ObjectStore.java | 7 + .../styx/routing/db/NotificationQueue.kt | 129 ++++++++++ .../hotels/styx/routing/db/StyxObjectStore.kt | 91 +++---- .../styx/routing/db/StyxObjectStoreTest.kt | 241 +++++++++++++----- 4 files changed, 357 insertions(+), 111 deletions(-) create mode 100644 components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt diff --git a/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java b/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java index 32e9c268ef..cafdbf74c4 100644 --- a/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java +++ b/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java @@ -41,4 +41,11 @@ public interface ObjectStore { * @return a collection of all entries. */ Collection> entrySet(); + + /** + * Returns this snapshot index. + * + * @return snapshot index. + */ + long index(); } diff --git a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt new file mode 100644 index 0000000000..d3fbc41e7d --- /dev/null +++ b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt @@ -0,0 +1,129 @@ +/* + Copyright (C) 2013-2019 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.hotels.styx.routing.db + +import com.hotels.styx.api.configuration.ObjectStore +import org.pcollections.HashTreePMap +import org.pcollections.PMap +import reactor.core.publisher.FluxSink +import java.util.Optional +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + + +internal class NotificationQueue(val watchers: CopyOnWriteArrayList>, val executor: ExecutorService) { + @Volatile + private var pendingSnapshot = IndexedSnapshot(0, HashTreePMap.empty()) + @Volatile + private var issuedSnapshot = IndexedSnapshot(0, HashTreePMap.empty()) + private val pendingChangeNotification = AtomicBoolean(false) + private val lock = ReentrantLock() + + // Listeners are just for testing purposes. + private val listeners = ConcurrentHashMap>() + + fun publishChange(snapshot: IndexedSnapshot) { + + val inQueue = lock.withLock { + // Preserve invariant: + // - pendingSnapshot is only ever increasing + // - (pendingChangeNotification == True) only if (pendingSnapshot > issuedSnapshot) + if (snapshot.index > pendingSnapshot.index) { + pendingSnapshot = snapshot + pendingChangeNotification.getAndSet(true) + } else { + // Ignore this event. Pending snapshot is more recent. + return + } + } + + if (!inQueue) { + executor.submit { + lock.withLock { + // Preserve invariant: + // - (pendingChangeNotification == False) only if (pendingSnapshot <= issuedSnapshot) + pendingChangeNotification.set(false) + issuedSnapshot = pendingSnapshot + } + + watchers.forEach { + it.invoke(newSnapshot(issuedSnapshot)) + } + + listeners.forEach { + it.value.invoke(ChangeNotification( + newSnapshot(issuedSnapshot), + pendingChangeNotification.get() + )) + } + } + } + } + + fun publishInitialWatch(sink: FluxSink>) { + executor.submit { + sink.next(newSnapshot(issuedSnapshot)) + listeners.forEach { + it.value.invoke(InitialWatchNotification( + newSnapshot(issuedSnapshot), + pendingChangeNotification.get() + )) + } + } + } + + internal fun addDispatchListener(key: String, listener: DispatchListener) { + listeners.put(key, listener) + } + + internal fun removeDispatchListener(key: String) { + listeners.remove(key) + } + + private fun newSnapshot(snapshot: IndexedSnapshot) = object : ObjectStore { + override fun get(key: String?): Optional { + return Optional.ofNullable(snapshot.snapshot[key]) + } + + override fun entrySet(): Collection> = entrySet(snapshot.snapshot) + + override fun index() = snapshot.index + } +} + +internal fun entrySet(snapshot: PMap): Collection> = snapshot.entries + +internal data class IndexedSnapshot(val index: Long, val snapshot: PMap) { + fun map(modification: (PMap) -> PMap) = IndexedSnapshot(this.index + 1, modification(this.snapshot)) +} + +internal typealias ChangeWatcher = (ObjectStore) -> Unit + +internal typealias DispatchListener = (DispatchListenerNotification) -> Unit + +internal sealed class DispatchListenerNotification + +internal data class ChangeNotification( + val snapshot: ObjectStore, + val pendingNotifications: Boolean) : DispatchListenerNotification() + +internal data class InitialWatchNotification( + val snapshot: ObjectStore, + val pendingNotifications: Boolean) : DispatchListenerNotification() diff --git a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt index 9ff7c403ea..f00699bbed 100644 --- a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt +++ b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt @@ -17,26 +17,31 @@ package com.hotels.styx.routing.db; import com.hotels.styx.api.configuration.ObjectStore import org.pcollections.HashTreePMap -import org.pcollections.PMap import org.reactivestreams.Publisher import reactor.core.publisher.Flux -import reactor.core.publisher.FluxSink import java.util.Optional import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicReference /** * Styx Route Database. */ -class StyxObjectStore : ObjectStore { - private val objects: AtomicReference> = AtomicReference(HashTreePMap.empty()) + +class StyxObjectStore internal constructor(executor: ExecutorService): ObjectStore { + private val objects: AtomicReference> = AtomicReference( + IndexedSnapshot(0, HashTreePMap.empty())) + private val watchers = CopyOnWriteArrayList>() + private val notificationQueue = NotificationQueue(watchers, executor) companion object { - private val executor = Executors.newSingleThreadExecutor() + private val sharedExecutor = Executors.newSingleThreadExecutor() } + constructor(): this(sharedExecutor) + /** * Retrieves an object from this object store. * @@ -49,13 +54,13 @@ class StyxObjectStore : ObjectStore { */ override fun get(name: String): Optional { - return Optional.ofNullable(objects().get(name)) + return Optional.ofNullable(objects().snapshot.get(name)) } /** * Retrieves all entries. */ - override fun entrySet(): Collection> = entrySet(objects.get()) + override fun entrySet(): Collection> = entrySet(objects.get().snapshot) /** * Inserts a new object in object store. @@ -75,18 +80,16 @@ class StyxObjectStore : ObjectStore { require(key.isNotEmpty()) { "ObjectStore insert: empty keys are not allowed." } var current = objects.get() - var new = current.plus(key, payload) + var new = current.map { it.plus(key, payload) } while (!objects.compareAndSet(current, new)) { current = objects.get() - new = current.plus(key, payload) + new = current.map { it.plus(key, payload) } } - queue { - notifyWatchers(new) - } + notificationQueue.publishChange(new) - return Optional.ofNullable(current[key]) + return Optional.ofNullable(current.snapshot[key]) } /** @@ -121,17 +124,17 @@ class StyxObjectStore : ObjectStore { fun compute(key: String, computation: (T?) -> T): Optional { require(key.isNotEmpty()) { "ObjectStore compute: empty keys are not allowed." } - var current: PMap + var current: IndexedSnapshot var result: T - var new: PMap + var new: IndexedSnapshot do { current = objects.get() - result = computation(current.get(key)) + result = computation(current.snapshot.get(key)) - new = if (result != current.get(key)){ + new = if (result != current.snapshot.get(key)){ // Consumer REPLACES an existing value or ADDS a new value - current.plus(key, result) + current.map { it.plus(key, result) } } else { // Consumer KEEPS the existing value current @@ -140,12 +143,10 @@ class StyxObjectStore : ObjectStore { if (current != new) { // Notify only if content changed: - queue { - notifyWatchers(new) - } + notificationQueue.publishChange(new) } - return Optional.ofNullable(current[key]) + return Optional.ofNullable(current.snapshot[key]) } @@ -164,20 +165,20 @@ class StyxObjectStore : ObjectStore { */ fun remove(key: String): Optional { var current = objects.get() - var new = current.minus(key) + var new = current.map { it.minus(key) } + // Unnecessarily increments the index when "key" doesn't exist: + // We will live with this for now. while (!objects.compareAndSet(current, new)) { current = objects.get() - new = current.minus(key) + new = current.map { it.minus(key) } } - if (current != new) { - queue { - notifyWatchers(new) - } + if (current.snapshot != new.snapshot) { + notificationQueue.publishChange(new) } - return Optional.ofNullable(current[key]) + return Optional.ofNullable(current.snapshot[key]) } /** @@ -195,41 +196,23 @@ class StyxObjectStore : ObjectStore { } watchers.add(watcher) - queue { - emitInitialSnapshot(sink) - } - } - } - private fun emitInitialSnapshot(sink: FluxSink>) { - sink.next(snapshot(objects())) + notificationQueue.publishInitialWatch(sink) + } } internal fun watchers() = watchers.size private fun objects() = objects.get() - private fun queue(task: () -> Unit) { - executor.submit(task) - } + override fun index() = objects.get().index - private fun notifyWatchers(objectsV2: PMap) { - watchers.forEach { listener -> - listener.invoke(snapshot(objectsV2)) - } + internal fun addDispatchListener(key: String, listener: DispatchListener) { + notificationQueue.addDispatchListener(key, listener) } - private fun snapshot(snapshot: PMap) = object : ObjectStore { - override fun get(key: String?): Optional { - return Optional.ofNullable(snapshot[key]) - } - - override fun entrySet(): Collection> = entrySet(snapshot) + internal fun removeDispatchListener(key: String) { + notificationQueue.removeDispatchListener(key) } - - private fun entrySet(snapshot: PMap): Collection> = snapshot - .entries } - -private typealias ChangeWatcher = (ObjectStore) -> Unit diff --git a/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt b/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt index 69a487578a..923341d76f 100644 --- a/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt +++ b/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt @@ -21,16 +21,24 @@ import io.kotlintest.matchers.boolean.shouldBeTrue import io.kotlintest.matchers.collections.shouldBeEmpty import io.kotlintest.matchers.collections.shouldNotBeEmpty import io.kotlintest.matchers.numerics.shouldBeGreaterThanOrEqual +import io.kotlintest.matchers.numerics.shouldBeLessThanOrEqual import io.kotlintest.milliseconds import io.kotlintest.seconds import io.kotlintest.shouldBe +import io.kotlintest.shouldNotBe import io.kotlintest.specs.FeatureSpec import reactor.core.publisher.Flux +import reactor.core.publisher.toFlux import reactor.test.StepVerifier +import java.time.Duration import java.util.Optional import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors import java.util.concurrent.Executors.newFixedThreadPool +import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit.SECONDS +import java.util.concurrent.atomic.AtomicReference // We can remove AssertionError::class.java argument from the // calls to `eventually`, after this bug fix is released: @@ -84,6 +92,7 @@ class StyxObjectStoreTest : FeatureSpec() { it.get("y") shouldBe Optional.of("y") } .thenCancel() + .log() .verify(4.seconds) db.watchers() shouldBe 0 @@ -105,6 +114,43 @@ class StyxObjectStoreTest : FeatureSpec() { } } + scenario("Maintains relative ordering between change and initial watch notifications") { + val executor = Executors.newSingleThreadExecutor() + + val db = StyxObjectStore(executor) + val events = mutableListOf() + + val watchConsumer = db.watch().toFlux().subscribe { + // Keeps the event notification thread busy to build up a backlog of events. + Thread.sleep(10) + } + + db.insert("key", 1) + db.insert("key", 2) + + val watcher = db.watch() + .toFlux() + .subscribe { + events.add(it.get("key").orElse(0xBAD_BEEF)) + } + + db.insert("key", 3) + db.insert("key", 4) + + watchConsumer.dispose() + watcher.dispose() + + executor.shutdown() + executor.awaitTermination(250, TimeUnit.MILLISECONDS) + + // Ensure the events were delivered in order + events.fold(0) { acc, value -> + value.shouldNotBe(0xBAD_BEEF) + value.shouldBeGreaterThanOrEqual(acc) + value + } + } + scenario("Replaces already existing object") { val db = StyxObjectStore() @@ -165,8 +211,21 @@ class StyxObjectStoreTest : FeatureSpec() { scenario("Replace an existing value") { val db = StyxObjectStore() + val latch = CountDownLatch(1) + + db.addDispatchListener("_") { + when (it) { + is ChangeNotification -> + if (it.snapshot.index() == 1L) { + latch.countDown() + } + + else -> { } + } + } db.insert("key", "old value") + latch.await() StepVerifier.create(db.watch()) .expectNextCount(1) @@ -175,6 +234,43 @@ class StyxObjectStoreTest : FeatureSpec() { .thenCancel() .verify() } + + scenario("Maintains relative ordering between change and initial watch notifications") { + val executor = Executors.newSingleThreadExecutor() + + val db = StyxObjectStore(executor) + val events = mutableListOf() + + val watchConsumer = db.watch().toFlux().subscribe { + // Keeps the event notification thread busy to build up a backlog of events. + Thread.sleep(10) + } + + db.compute("key") { 1 } + db.compute("key") { 2 } + + val watcher = db.watch() + .toFlux() + .subscribe { + events.add(it.get("key").orElse(0xBAD_BEEF)) + } + + db.compute("key") { 3 } + db.compute("key") { 4 } + + watchConsumer.dispose() + watcher.dispose() + + executor.shutdown() + executor.awaitTermination(250, TimeUnit.MILLISECONDS) + + // Ensure the events were delivered in order + events.fold(0) { acc, value -> + value.shouldNotBe(0xBAD_BEEF) + value.shouldBeGreaterThanOrEqual(acc) + value + } + } } feature("Remove") { @@ -256,76 +352,70 @@ class StyxObjectStoreTest : FeatureSpec() { db.entrySet().shouldBeEmpty() } - scenario("Returns Optional.empty, when previous value doesn't exist") { - val db = StyxObjectStore() - - db.remove("key") shouldBe Optional.empty() - } - - scenario("Returns previous, replaced value") { - val db = StyxObjectStore() + scenario("Maintains relative ordering between change and initial watch notifications") { + val executor = Executors.newSingleThreadExecutor() - db.insert("key", "a-value") shouldBe Optional.empty() + val db = StyxObjectStore(executor) + db.insert("key-01", 1) + db.insert("key-02", 2) + db.insert("key-03", 3) + db.insert("key-04", 4) - db.remove("key") shouldBe Optional.of("a-value") - } - } + val events = mutableListOf() - feature("Watch") { - scenario("Supports multiple watchers") { - val db = StyxObjectStore() - val watchEvents1 = CopyOnWriteArrayList>() - val watchEvents2 = CopyOnWriteArrayList>() + val watchConsumer = db.watch().toFlux().subscribe { + // Keeps the event notification thread busy to build up a backlog of events. + Thread.sleep(10) + } - val watcher1 = Flux.from(db.watch()).subscribe { watchEvents1.add(it) } - val watcher2 = Flux.from(db.watch()).subscribe { watchEvents2.add(it) } + db.remove("key-01") + db.remove("key-02") - // Wait for the initial watch event ... - eventually(1.seconds, java.lang.AssertionError::class.java) { - watchEvents1.size shouldBe 1 - watchEvents1[0].get("x") shouldBe Optional.empty() + val watcher = db.watch() + .toFlux() + .subscribe { + events.add(it.index()) + } - watchEvents2.size shouldBe 1 - watchEvents2[0].get("x") shouldBe Optional.empty() - } + db.remove("key-03") + db.remove("key-04") - db.insert("x", "x") - db.insert("y", "y") + watchConsumer.dispose() + watcher.dispose() - // ... otherwise we aren't guaranteed what events are going show up. - // - // The ordering between initial watch event in relation to objectStore.inserts are - // non-deterministic. - eventually(1.seconds, AssertionError::class.java) { - watchEvents1.size shouldBe 3 - watchEvents2.size shouldBe 3 + executor.shutdown() + executor.awaitTermination(250, TimeUnit.MILLISECONDS) - watchEvents1[1].get("x") shouldBe Optional.of("x") - watchEvents2[1].get("x") shouldBe Optional.of("x") + // Ensure the events were delivered in order + events.fold(0L) { previous, index -> + index.shouldBeGreaterThanOrEqual(previous) + index + } + } - watchEvents1[1].get("y") shouldBe Optional.empty() - watchEvents2[1].get("y") shouldBe Optional.empty() + scenario("Returns Optional.empty, when previous value doesn't exist") { + val db = StyxObjectStore() - watchEvents1[2].get("x") shouldBe Optional.of("x") - watchEvents2[2].get("x") shouldBe Optional.of("x") + db.remove("key") shouldBe Optional.empty() + } - watchEvents1[2].get("y") shouldBe Optional.of("y") - watchEvents2[2].get("y") shouldBe Optional.of("y") - } + scenario("Returns previous, replaced value") { + val db = StyxObjectStore() - watcher1.dispose() - db.watchers() shouldBe 1 + db.insert("key", "a-value") shouldBe Optional.empty() - watcher2.dispose() - db.watchers() shouldBe 0 + db.remove("key") shouldBe Optional.of("a-value") } - scenario("Provides immutable snapshot") { + } + + feature("Watch") { + scenario("Publishes an immutable final state snapshot") { val db = StyxObjectStore() val watchEvents = CopyOnWriteArrayList>() - val watcher = Flux.from(db.watch()).subscribe { watchEvents.add(it) } + val watcher = db.watch().toFlux().subscribe { watchEvents.add(it) } eventually(1.seconds, AssertionError::class.java) { watchEvents.isNotEmpty().shouldBeTrue() @@ -337,18 +427,55 @@ class StyxObjectStoreTest : FeatureSpec() { db.insert("y", "y") eventually(1.seconds, AssertionError::class.java) { - watchEvents.size shouldBe 3 - watchEvents[1].get("x") shouldBe Optional.of("x") - watchEvents[1].get("y") shouldBe Optional.empty() - - watchEvents[2].get("x") shouldBe Optional.of("x") - watchEvents[2].get("y") shouldBe Optional.of("y") + watchEvents.last()["x"].isPresent.shouldBeTrue() + watchEvents.last()["y"].isPresent.shouldBeTrue() } watcher.dispose() db.watchers() shouldBe 0 } + scenario("Supports multiple watchers") { + for (x in 0..100) { + val db = StyxObjectStore() + val watchEvents1 = CopyOnWriteArrayList>() + val watchEvents2 = CopyOnWriteArrayList>() + + val watcher1 = Flux.from(db.watch()).subscribe { watchEvents1.add(it) } + val watcher2 = Flux.from(db.watch()).subscribe { watchEvents2.add(it) } + + // Wait for the initial watch event ... + eventually(1.seconds, java.lang.AssertionError::class.java) { + watchEvents1.size shouldBe 1 + watchEvents1[0].get("x") shouldBe Optional.empty() + + watchEvents2.size shouldBe 1 + watchEvents2[0].get("x") shouldBe Optional.empty() + } + + db.insert("x", "x") + db.insert("y", "y") + + // ... otherwise we aren't guaranteed what events are going show up. + // + // The ordering between initial watch event in relation to objectStore.inserts are + // non-deterministic. + eventually(1.seconds, AssertionError::class.java) { + watchEvents1.last()["x"].isPresent.shouldBeTrue() + watchEvents1.last()["y"].isPresent.shouldBeTrue() + + watchEvents1.last()["x"].shouldBe(Optional.of("x")) + watchEvents1.last()["y"].shouldBe(Optional.of("y")) + } + + watcher1.dispose() + db.watchers() shouldBe 1 + + watcher2.dispose() + db.watchers() shouldBe 0 + } + } + scenario("Provides current snapshot at subscription") { val db = StyxObjectStore() val watchEvents = CopyOnWriteArrayList>() @@ -416,4 +543,4 @@ class StyxObjectStoreTest : FeatureSpec() { } } -} \ No newline at end of file +} From d385c9296ed5818a0a9a2d5f277d49dddaa7faa5 Mon Sep 17 00:00:00 2001 From: OwenLindsell Date: Thu, 28 Nov 2019 14:07:54 +0000 Subject: [PATCH 12/12] fixed spelling --- .../connectionpool/NettyToStyxResponsePropagatorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java index 680bbe910c..9c04b5d9ff 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java @@ -124,8 +124,8 @@ public void doesNotPropagateErrorsTwice() throws Exception { StepVerifier.create(response.body()) .then(channel::runPendingTasks) // Execute onSubscribe in FSM - .then(() -> channel.pipeline().fireExceptionCaught(new RuntimeException())) // Will emmit BadHttpResponseException - .then(() -> channel.pipeline().fireChannelInactive()) // Will emmit TransportLostException + .then(() -> channel.pipeline().fireExceptionCaught(new RuntimeException())) // Will emit BadHttpResponseException + .then(() -> channel.pipeline().fireChannelInactive()) // Will emit TransportLostException .expectError(BadHttpResponseException.class) .verify();