Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace rxjava with reactor #541

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public interface ObjectStore<T> {
* @return a collection of all entries.
*/
Collection<Map.Entry<String, T>> entrySet();

/**
* Returns this snapshot index.
*
* @return snapshot index.
*/
long index();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.extension.Origin;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;

import java.io.Closeable;
import java.util.EventListener;
Expand Down Expand Up @@ -47,9 +47,9 @@ interface Factory {
* Writes HTTP request to a remote peer in the context of this connection.
*
* @param request
* @return an observable that provides the response
* @return a Publisher that provides the response
*/
Observable<LiveHttpResponse> write(LiveHttpRequest request);
Flux<LiveHttpResponse> write(LiveHttpRequest request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could convert this to Mono. But that would be a separate increment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have Flux (or Mono) in the signature here? I see an advantage of "Mono" as it depicts there's only going to be 0..1 events, but otherwise I would try to expose Reactive-streams types instead of the one from reactor-core.


/**
* Returns if the underlying connection is still active.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.hotels.styx.client.connectionpool.ConnectionPool;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import rx.RxReactiveStreams;

import static java.util.Objects.requireNonNull;

Expand All @@ -44,9 +43,8 @@ public static StyxHostHttpClient create(ConnectionPool pool) {
public Publisher<LiveHttpResponse> sendRequest(LiveHttpRequest request) {
return Flux.from(pool.borrowConnection())
.flatMap(connection -> {
Publisher<LiveHttpResponse> write = RxReactiveStreams.toPublisher(connection.write(request));

return ResponseEventListener.from(write)
return ResponseEventListener.from(connection.write(request))
.whenCancelled(() -> pool.closeConnection(connection))
.whenResponseError(cause -> pool.closeConnection(connection))
.whenContentError(cause -> pool.closeConnection(connection))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.hotels.styx.client.ssl.SslContextFactory;
import io.netty.handler.ssl.SslContext;
import reactor.core.publisher.Mono;
import rx.RxReactiveStreams;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -117,18 +116,23 @@ static Mono<LiveHttpResponse> sendRequestInternal(NettyConnectionFactory connect

SslContext sslContext = getSslContext(params.https(), params.tlsSettings());

Mono<LiveHttpResponse> responseObservable = connectionFactory.createConnection(
return connectionFactory.createConnection(
origin,
new ConnectionSettings(params.connectTimeoutMillis()),
sslContext
).flatMap(connection ->
Mono.from(RxReactiveStreams.toPublisher(
connection.write(networkRequest)
.doOnTerminate(connection::close)))
Mono.from(connection.write(networkRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record:

As discussed, Publisher<LiveHttpResponse> cancel event could occur whilst connection is being used in flatMap operator. In this case the connection needs to be closed.

Let's look into this as a separate improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use doFinally() here instead of having to include doOnComplete, doOnError, doOnCancel ...?

.doOnComplete(connection::close)
.doOnError(e -> connection.close())
.map(response -> response.newBuilder()
.body(it ->
it.doOnEnd(x -> connection.close())
.doOnCancel(() -> connection.close())
)
.build()
)
)
);

return responseObservable;

}

private static LiveHttpRequest addUserAgent(String userAgent, LiveHttpRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,9 @@
import com.google.common.base.Ticker;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.client.Connection;
import com.hotels.styx.api.extension.Origin;
import rx.Observable;
import com.hotels.styx.client.Connection;
import reactor.core.publisher.Flux;

import java.util.function.Supplier;

Expand Down Expand Up @@ -53,7 +53,7 @@ public boolean isConnected() {
}

@Override
public Observable<LiveHttpResponse> write(LiveHttpRequest request) {
public Flux<LiveHttpResponse> write(LiveHttpRequest request) {
return nettyConnection.write(request);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,8 +21,8 @@
import com.hotels.styx.api.extension.Origin;
import com.hotels.styx.client.Connection;
import com.hotels.styx.client.ConnectionSettings;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;

import static com.google.common.base.Objects.toStringHelper;

Expand Down Expand Up @@ -50,7 +50,7 @@ public StubConnection(Origin origin) {
}

@Override
public Observable<LiveHttpResponse> write(LiveHttpRequest request) {
public Flux<LiveHttpResponse> write(LiveHttpRequest request) {
throw new UnsupportedOperationException("Not implemented");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import rx.Observable;
import rx.Subscriber;

Expand Down Expand Up @@ -127,17 +129,17 @@ private static boolean requestIsOngoing(RequestBodyChunkSubscriber bodyChunkSubs
return bodyChunkSubscriber != null && bodyChunkSubscriber.requestIsOngoing();
}

public Observable<LiveHttpResponse> execute(NettyConnection nettyConnection) {
public Flux<LiveHttpResponse> execute(NettyConnection nettyConnection) {
AtomicReference<RequestBodyChunkSubscriber> requestRequestBodyChunkSubscriber = new AtomicReference<>();
requestTime = System.currentTimeMillis();
executeCount.incrementAndGet();

Observable<LiveHttpResponse> observable = Observable.create(subscriber -> {
Flux<LiveHttpResponse> responseFlux = Flux.<LiveHttpResponse>create(sink -> {
if (nettyConnection.isConnected()) {
RequestBodyChunkSubscriber bodyChunkSubscriber = new RequestBodyChunkSubscriber(request, nettyConnection);
requestRequestBodyChunkSubscriber.set(bodyChunkSubscriber);
addProxyBridgeHandlers(nettyConnection, subscriber);
new WriteRequestToOrigin(subscriber, nettyConnection, request, bodyChunkSubscriber)
addProxyBridgeHandlers(nettyConnection, sink);
new WriteRequestToOrigin(sink, nettyConnection, request, bodyChunkSubscriber)
.write();
if (requestLoggingEnabled) {
httpRequestMessageLogger.logRequest(request, nettyConnection.getOrigin());
Expand All @@ -146,13 +148,12 @@ public Observable<LiveHttpResponse> execute(NettyConnection nettyConnection) {
});

if (requestLoggingEnabled) {
observable = observable
responseFlux = responseFlux
.doOnNext(response -> {
httpRequestMessageLogger.logResponse(request, response);
});
}

return observable.map(response ->
return responseFlux.map(response ->
Requests.doFinally(response, cause -> {
if (nettyConnection.isConnected()) {
removeProxyBridgeHandlers(nettyConnection);
Expand All @@ -166,7 +167,7 @@ public Observable<LiveHttpResponse> execute(NettyConnection nettyConnection) {
}));
}

private void addProxyBridgeHandlers(NettyConnection nettyConnection, Subscriber<? super LiveHttpResponse> observer) {
private void addProxyBridgeHandlers(NettyConnection nettyConnection, FluxSink<LiveHttpResponse> sink) {
Origin origin = nettyConnection.getOrigin();
Channel channel = nettyConnection.channel();
channel.pipeline().addLast(IDLE_HANDLER_NAME, new IdleStateHandler(0, 0, responseTimeoutMillis, MILLISECONDS));
Expand All @@ -176,7 +177,7 @@ private void addProxyBridgeHandlers(NettyConnection nettyConnection, Subscriber<
new RequestsToOriginMetricsCollector(originStatsFactory.originStats(origin))));
channel.pipeline().addLast(
NettyToStyxResponsePropagator.NAME,
new NettyToStyxResponsePropagator(observer, origin, responseTimeoutMillis, MILLISECONDS, request));
new NettyToStyxResponsePropagator(sink, origin, responseTimeoutMillis, MILLISECONDS, request));
}

private void removeProxyBridgeHandlers(NettyConnection connection) {
Expand Down Expand Up @@ -221,14 +222,14 @@ public String toString() {
}

private static final class WriteRequestToOrigin {
private final Subscriber<? super LiveHttpResponse> responseFromOriginObserver;
private final FluxSink<LiveHttpResponse> responseFromOriginFlux;
private final NettyConnection nettyConnection;
private final LiveHttpRequest request;
private final RequestBodyChunkSubscriber requestBodyChunkSubscriber;

private WriteRequestToOrigin(Subscriber<? super LiveHttpResponse> responseFromOriginObserver, NettyConnection nettyConnection, LiveHttpRequest request,
private WriteRequestToOrigin(FluxSink<LiveHttpResponse> responseFromOriginFlux, NettyConnection nettyConnection, LiveHttpRequest request,
RequestBodyChunkSubscriber requestBodyChunkSubscriber) {
this.responseFromOriginObserver = responseFromOriginObserver;
this.responseFromOriginFlux = responseFromOriginFlux;
this.nettyConnection = nettyConnection;
this.request = request;
this.requestBodyChunkSubscriber = requestBodyChunkSubscriber;
Expand All @@ -242,7 +243,7 @@ public void write() {
originChannel.writeAndFlush(messageHeaders)
.addListener(subscribeToRequestBody());
} else {
responseFromOriginObserver.onError(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin()));
responseFromOriginFlux.error(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin()));
}
}

Expand All @@ -256,7 +257,7 @@ private ChannelFutureListener subscribeToRequestBody() {
String channelIdentifier = String.format("%s -> %s", nettyConnection.channel().localAddress(), nettyConnection.channel().remoteAddress());
LOGGER.error(format("Failed to send request headers. origin=%s connection=%s request=%s",
nettyConnection.getOrigin(), channelIdentifier, request), headersFuture.cause());
responseFromOriginObserver.onError(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin()));
responseFromOriginFlux.error(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin()));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.AttributeKey;
import rx.Observable;
import reactor.core.publisher.Flux;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,7 +91,7 @@ private static void addChannelHandlers(Channel channel, HttpConfig httpConfig, S
}

@Override
public Observable<LiveHttpResponse> write(LiveHttpRequest request) {
public Flux<LiveHttpResponse> write(LiveHttpRequest request) {
return this.requestOperationFactory.newHttpRequestOperation(request).execute(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import reactor.core.publisher.FluxSink;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
Expand All @@ -42,8 +43,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.hotels.styx.api.LiveHttpResponse.response;
import static com.hotels.styx.api.HttpResponseStatus.statusWithCode;
import static com.hotels.styx.api.LiveHttpResponse.response;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.util.ReferenceCountUtil.retain;
import static java.lang.String.format;
Expand All @@ -60,7 +61,7 @@ final class NettyToStyxResponsePropagator extends SimpleChannelInboundHandler {
private static final Logger LOGGER = getLogger(NettyToStyxResponsePropagator.class);

private final AtomicBoolean responseCompleted = new AtomicBoolean(false);
private final Subscriber<? super LiveHttpResponse> responseObserver;
private final FluxSink<LiveHttpResponse> sink;
private final LiveHttpRequest request;

private final Origin origin;
Expand All @@ -71,23 +72,16 @@ final class NettyToStyxResponsePropagator extends SimpleChannelInboundHandler {
// to be delivered from the same thread.
private boolean toBeClosed;

NettyToStyxResponsePropagator(Subscriber<? super LiveHttpResponse> responseObserver, Origin origin) {
this(responseObserver, origin, 5L, TimeUnit.SECONDS);
}

NettyToStyxResponsePropagator(Subscriber<? super LiveHttpResponse> responseObserver,
Origin origin,
long idleTimeout,
TimeUnit timeUnit) {
this(responseObserver, origin, idleTimeout, timeUnit, null);
NettyToStyxResponsePropagator(FluxSink<LiveHttpResponse> sink, Origin origin) {
this(sink, origin, 5L, TimeUnit.SECONDS, null);
}

NettyToStyxResponsePropagator(Subscriber<? super LiveHttpResponse> responseObserver,
NettyToStyxResponsePropagator(FluxSink<LiveHttpResponse> sink,
Origin origin,
long idleTimeout,
TimeUnit timeUnit,
LiveHttpRequest request) {
this.responseObserver = responseObserver;
this.sink = sink;
this.origin = origin;
this.idleTimeoutMillis = timeUnit.toMillis(idleTimeout);
this.request = request;
Expand Down Expand Up @@ -150,7 +144,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
}

LiveHttpResponse response = toStyxResponse(nettyResponse, contentObservable, origin);
this.responseObserver.onNext(response);
this.sink.next(response);
}
if (msg instanceof HttpContent) {
ByteBuf content = ((ByteBufHolder) msg).content();
Expand Down Expand Up @@ -210,13 +204,13 @@ private FlowControllingHttpContentProducer createProducer(ChannelHandlerContext

private void emitResponseCompleted() {
if (responseCompleted.compareAndSet(false, true)) {
responseObserver.onCompleted();
sink.complete();
}
}

private void emitResponseError(Throwable cause) {
if (responseCompleted.compareAndSet(false, true)) {
this.responseObserver.onError(cause);
this.sink.error(cause);
}
}

Expand Down
Loading