Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate Rx.Java from ConnectionPool and its consumers #343

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import rx.Observable;
import org.reactivestreams.Publisher;

/**
* HTTP Client that returns an observable of response.
Expand All @@ -33,5 +33,5 @@ public interface BackendServiceClient {
* In order to cancel the ongoing request, just unsubscribe from it.
*
*/
Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request);
Publisher<LiveHttpResponse> sendRequest(LiveHttpRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package com.hotels.styx.client;

import com.google.common.collect.ImmutableList;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.HttpResponseStatus;
import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.MetricRegistry;
import com.hotels.styx.api.RequestCookie;
import com.hotels.styx.api.ResponseEventListener;
Expand All @@ -34,8 +34,9 @@
import com.hotels.styx.client.retry.RetryNTimes;
import com.hotels.styx.client.stickysession.StickySessionLoadBalancingStrategy;
import com.hotels.styx.server.HttpInterceptorContext;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import rx.Observable;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -57,7 +58,6 @@
import static java.util.stream.Collectors.joining;
import static java.util.stream.StreamSupport.stream;
import static org.slf4j.LoggerFactory.getLogger;
import static rx.RxReactiveStreams.toObservable;

/**
* A configurable HTTP client that uses connection pooling, load balancing, etc.
Expand Down Expand Up @@ -99,7 +99,7 @@ private StyxBackendServiceClient(Builder builder) {
}

@Override
public Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request) {
public Publisher<LiveHttpResponse> sendRequest(LiveHttpRequest request) {
return sendRequest(rewriteUrl(request), new ArrayList<>(), 0);
}

Expand Down Expand Up @@ -137,9 +137,9 @@ private static boolean isHeadRequest(LiveHttpRequest request) {
return request.method().equals(HEAD);
}

private Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request, List<RemoteHost> previousOrigins, int attempt) {
private Publisher<LiveHttpResponse> sendRequest(LiveHttpRequest request, List<RemoteHost> previousOrigins, int attempt) {
if (attempt >= MAX_RETRY_ATTEMPTS) {
return Observable.error(new NoAvailableHostsException(this.id));
return Flux.error(new NoAvailableHostsException(this.id));
}

Optional<RemoteHost> remoteHost = selectOrigin(request);
Expand All @@ -148,16 +148,15 @@ private Observable<LiveHttpResponse> sendRequest(LiveHttpRequest request, List<R
List<RemoteHost> newPreviousOrigins = newArrayList(previousOrigins);
newPreviousOrigins.add(remoteHost.get());

return ResponseEventListener.from(
toObservable(host.hostClient().handle(request, HttpInterceptorContext.create()))
.map(response -> addStickySessionIdentifier(response, host.origin())))
return ResponseEventListener.from(host.hostClient().handle(request, HttpInterceptorContext.create())
.map(response -> addStickySessionIdentifier(response, host.origin())))
.whenResponseError(cause -> logError(request, cause))
.whenCancelled(() -> originStatsFactory.originStats(host.origin()).requestCancelled())
.apply()
.doOnNext(this::recordErrorStatusMetrics)
.map(response -> removeUnexpectedResponseBody(request, response))
.map(this::removeRedundantContentLengthHeader)
.onErrorResumeNext(cause -> {
.onErrorResume(cause -> {
RetryPolicyContext retryContext = new RetryPolicyContext(this.id, attempt + 1, cause, request, previousOrigins);
return retry(request, retryContext, newPreviousOrigins, attempt + 1, cause);
})
Expand All @@ -174,7 +173,7 @@ private LiveHttpResponse addOriginId(Id originId, LiveHttpResponse response) {
.build();
}

Observable<LiveHttpResponse> retry(LiveHttpRequest request, RetryPolicyContext retryContext, List<RemoteHost> previousOrigins, int attempt, Throwable cause) {
private Flux<LiveHttpResponse> retry(LiveHttpRequest request, RetryPolicyContext retryContext, List<RemoteHost> previousOrigins, int attempt, Throwable cause) {
LoadBalancer.Preferences lbContext = new LoadBalancer.Preferences() {
@Override
public Optional<String> preferredOrigins() {
Expand All @@ -190,9 +189,9 @@ public List<Origin> avoidOrigins() {
};

if (this.retryPolicy.evaluate(retryContext, loadBalancer, lbContext).shouldRetry()) {
return sendRequest(request, previousOrigins, attempt);
return Flux.from(sendRequest(request, previousOrigins, attempt));
} else {
return Observable.error(cause);
return Flux.error(cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancingMetricSupplier;
import com.hotels.styx.client.connectionpool.ConnectionPool;
import org.reactivestreams.Publisher;
import rx.Observable;
import reactor.core.publisher.Flux;
import rx.RxReactiveStreams;

import static java.util.Objects.requireNonNull;
Expand All @@ -42,18 +42,17 @@ public static StyxHostHttpClient create(ConnectionPool pool) {
}

public Publisher<LiveHttpResponse> sendRequest(LiveHttpRequest request) {
return RxReactiveStreams.toPublisher(
pool.borrowConnection()
.flatMap(connection -> {
Observable<LiveHttpResponse> write = connection.write(request);
return Flux.from(pool.borrowConnection())
.flatMap(connection -> {
Publisher<LiveHttpResponse> write = RxReactiveStreams.toPublisher(connection.write(request));

return ResponseEventListener.from(write)
.whenCancelled(() -> pool.closeConnection(connection))
.whenResponseError(cause -> pool.closeConnection(connection))
.whenContentError(cause -> pool.closeConnection(connection))
.whenCompleted(() -> pool.returnConnection(connection))
.apply();
}));
return ResponseEventListener.from(write)
.whenCancelled(() -> pool.closeConnection(connection))
.whenResponseError(cause -> pool.closeConnection(connection))
.whenContentError(cause -> pool.closeConnection(connection))
.whenCompleted(() -> pool.returnConnection(connection))
.apply();
});
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
*/
package com.hotels.styx.client.connectionpool;

import com.hotels.styx.client.Connection;
import com.hotels.styx.api.extension.Origin;
import rx.Observable;
import com.hotels.styx.api.extension.service.ConnectionPoolSettings;
import com.hotels.styx.client.Connection;
import org.reactivestreams.Publisher;

import java.io.Closeable;
import java.util.function.Function;
import com.hotels.styx.api.extension.service.ConnectionPoolSettings;

/**
* A pool of connections.
Expand Down Expand Up @@ -116,7 +115,7 @@ interface Factory {
*
* @return the borrowed connection
*/
Observable<Connection> borrowConnection();
Publisher<Connection> borrowConnection();

/**
* Returns back the connection to the host's pool. May close the connection if the
Expand Down Expand Up @@ -158,14 +157,6 @@ interface Factory {
*/
ConnectionPoolSettings settings();

default <T> Observable<T> withConnection(Function<Connection, Observable<T>> task) {
return borrowConnection()
.flatMap(connection ->
task.apply(connection)
.doOnCompleted(() -> returnConnection(connection))
.doOnError(throwable -> closeConnection(connection)));
}

/**
* Closes this pool and releases any system resources associated with it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import static java.util.Objects.nonNull;
import static java.util.Objects.requireNonNull;
import static org.slf4j.LoggerFactory.getLogger;
import static rx.RxReactiveStreams.toObservable;

/**
* A connection pool implementation.
Expand Down Expand Up @@ -72,12 +71,7 @@ public Origin getOrigin() {
}

@Override
public Observable<Connection> borrowConnection() {
return toObservable(borrowConnection2());
}

@VisibleForTesting
Publisher<Connection> borrowConnection2() {
public Publisher<Connection> borrowConnection() {
return Mono.<Connection>create(sink -> {
Connection connection = dequeue();
if (connection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package com.hotels.styx.client.connectionpool;

import com.codahale.metrics.Gauge;
import com.hotels.styx.client.Connection;
import com.hotels.styx.api.extension.Origin;
import com.hotels.styx.api.MetricRegistry;
import com.hotels.styx.api.extension.Origin;
import com.hotels.styx.api.extension.service.ConnectionPoolSettings;
import com.hotels.styx.client.Connection;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import rx.Observable;

import static com.hotels.styx.client.applications.metrics.OriginMetrics.originMetricsScope;
import static java.util.Arrays.asList;
Expand All @@ -46,7 +46,7 @@ public Origin getOrigin() {
}

@Override
public Observable borrowConnection() {
public Publisher<Connection> borrowConnection() {
return connectionPool.borrowConnection();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
*/
package com.hotels.styx.client

import java.util.concurrent.atomic.AtomicLong

import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder
import com.github.tomakehurst.wiremock.client.WireMock._
import com.google.common.base.Charsets._
import com.hotels.styx.api.HttpResponseStatus.OK
import com.hotels.styx.api.LiveHttpRequest.get
import com.hotels.styx.api.LiveHttpResponse
import com.hotels.styx.api.exceptions.ResponseTimeoutException
import com.hotels.styx.api.extension.Origin._
import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancer
import com.hotels.styx.api.extension.{ActiveOrigins, Origin}
import com.hotels.styx.api.exceptions.ResponseTimeoutException
import com.hotels.styx.api.HttpResponseStatus.OK
import com.hotels.styx.api.extension.service.BackendService
import com.hotels.styx.api.extension.{ActiveOrigins, Origin}
import com.hotels.styx.client.OriginsInventory.newOriginsInventoryBuilder
import StyxBackendServiceClient._
import com.hotels.styx.api.LiveHttpResponse
import com.hotels.styx.client.StyxBackendServiceClient._
import com.hotels.styx.client.loadbalancing.strategies.BusyConnectionsStrategy
import com.hotels.styx.support.api.BlockingObservables.{waitForResponse, waitForStreamingResponse}
import com.hotels.styx.support.server.FakeHttpServer
import com.hotels.styx.support.server.UrlMatchingStrategies._
import io.netty.buffer.Unpooled._
Expand All @@ -38,10 +39,14 @@ import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpHeaders.Names._
import io.netty.handler.codec.http.HttpVersion._
import io.netty.handler.codec.http._
import org.reactivestreams.Subscription
import org.scalatest._
import org.scalatest.mock.MockitoSugar
import reactor.core.publisher.Mono
import rx.observers.TestSubscriber

import scala.util.Try

class BackendServiceClientSpec extends FunSuite with BeforeAndAfterAll with Matchers with BeforeAndAfter with MockitoSugar {
var webappOrigin: Origin = _

Expand Down Expand Up @@ -82,15 +87,19 @@ class BackendServiceClientSpec extends FunSuite with BeforeAndAfterAll with Matc

test("Emits an HTTP response even when content observable remains un-subscribed.") {
originOneServer.stub(urlStartingWith("/"), response200OkWithContentLengthHeader("Test message body."))
val response = waitForResponse(client.sendRequest(get("/foo/1").build()))
val response = Mono.from(client.sendRequest(get("/foo/1").build())).block()
assert(response.status() == OK, s"\nDid not get response with 200 OK status.\n$response\n")
}


test("Emits an HTTP response containing Content-Length from persistent connection that stays open.") {
originOneServer.stub(urlStartingWith("/"), response200OkWithContentLengthHeader("Test message body."))

val response = waitForResponse(client.sendRequest(get("/foo/2").build()))
val response = Mono.from(client.sendRequest(get("/foo/2").build()))
.flatMap((liveHttpResponse: LiveHttpResponse) => {
Mono.from(liveHttpResponse.aggregate(10000))
})
.block()

assert(response.status() == OK, s"\nDid not get response with 200 OK status.\n$response\n")
assert(response.bodyAs(UTF_8) == "Test message body.", s"\nReceived wrong/unexpected response body.")
Expand All @@ -100,26 +109,33 @@ class BackendServiceClientSpec extends FunSuite with BeforeAndAfterAll with Matc
ignore("Determines response content length from server closing the connection.") {
// originRespondingWith(response200OkFollowedFollowedByServerConnectionClose("Test message body."))

val response = waitForResponse(client.sendRequest(get("/foo/3").build()))
assert(response.status() == OK, s"\nDid not get response with 200 OK status.\n$response\n")
val response = Mono.from(client.sendRequest(get("/foo/3").build()))
.flatMap((liveHttpResponse: LiveHttpResponse) => {
Mono.from(liveHttpResponse.aggregate(10000))
})
.block()

assert(response.status() == OK, s"\nDid not get response with 200 OK status.\n$response\n")
assert(response.body().nonEmpty, s"\nResponse body is absent.")
assert(response.bodyAs(UTF_8) == "Test message body.", s"\nIncorrect response body.")
}

test("Emits onError when origin responds too slowly") {
val start = new AtomicLong()
originOneServer.stub(urlStartingWith("/"), aResponse
.withStatus(OK.code())
.withFixedDelay(3000))

client.sendRequest(get("/foo/4").build()).subscribe(testSubscriber)
val duration = time {
testSubscriber.awaitTerminalEvent()
val maybeResponse = Try {
Mono.from(client.sendRequest(get("/foo/4").build()))
.doOnSubscribe((t: Subscription) => start.set(System.currentTimeMillis()))
.block()
}

assert(testSubscriber.getOnErrorEvents.get(0).isInstanceOf[ResponseTimeoutException], "- Client emitted an incorrect exception!")
println("responseTimeout: " + duration)
duration shouldBe responseTimeout +- 250
val duration = System.currentTimeMillis() - start.get()

assert(maybeResponse.failed.get.isInstanceOf[ResponseTimeoutException], "- Client emitted an incorrect exception!")
duration shouldBe responseTimeout.toLong +- 250
}

def time[A](codeBlock: => A) = {
Expand All @@ -128,12 +144,11 @@ class BackendServiceClientSpec extends FunSuite with BeforeAndAfterAll with Matc
((System.nanoTime - s) / 1e6).asInstanceOf[Int]
}

private def response200OkWithContentLengthHeader(content: String): ResponseDefinitionBuilder = {
return aResponse
private def response200OkWithContentLengthHeader(content: String): ResponseDefinitionBuilder = aResponse
.withStatus(OK.code())
.withHeader(CONTENT_LENGTH, content.length.toString)
.withBody(content)
}


def response200OkFollowedFollowedByServerConnectionClose(content: String): (ChannelHandlerContext, Any) => Any = {
(ctx: ChannelHandlerContext, msg: scala.Any) => {
Expand Down
Loading