diff --git a/components/api/src/main/java/com/hotels/styx/api/StyxCoreObservable.java b/components/api/src/main/java/com/hotels/styx/api/StyxCoreObservable.java index 13408def29..4cfe68cdaf 100644 --- a/components/api/src/main/java/com/hotels/styx/api/StyxCoreObservable.java +++ b/components/api/src/main/java/com/hotels/styx/api/StyxCoreObservable.java @@ -25,7 +25,7 @@ /** * An observable that underpins the StyxObservable interface. * - * @param + * @param type of object published */ class StyxCoreObservable implements StyxObservable { private final Observable delegate; @@ -38,18 +38,6 @@ public StyxCoreObservable(CompletionStage future) { this.delegate = toObservable(future); } - private static Observable toObservable(CompletionStage future) { - return Observable.create(subscriber -> - future.whenComplete((result, error) -> { - if (error != null) { - subscriber.onError(error); - } else { - subscriber.onNext(result); - subscriber.onCompleted(); - } - })); - } - public static StyxObservable empty() { return new StyxCoreObservable<>(Observable.empty()); } @@ -62,31 +50,23 @@ public static StyxObservable error(Throwable cause) { return new StyxCoreObservable(Observable.error(cause)); } - public StyxObservable map(Function transformation) { + public StyxObservable map(Function transformation) { return new StyxCoreObservable<>(delegate.map(transformation::apply)); } - public StyxObservable flatMap(Function> transformation) { + public StyxObservable flatMap(Function> transformation) { return new StyxCoreObservable<>(delegate.flatMap(response -> toObservable(transformation.apply(response)))); } - private static Observable toObservable(StyxObservable styxObservable) { - return styxObservable instanceof StyxCoreObservable - ? ((StyxCoreObservable) styxObservable).delegate - : toObservable(styxObservable.asCompletableFuture()); - } - - public StyxObservable reduce(BiFunction accumulator, U seed) { + public StyxObservable reduce(BiFunction accumulator, R seed) { return new StyxCoreObservable<>(delegate.reduce(seed, (result, element) -> accumulator.apply(element, result))); } @Override - public StyxObservable onError(Function> errorHandler) { - return new StyxCoreObservable<>(delegate.onErrorResumeNext(cause -> { - StyxCoreObservable result = (StyxCoreObservable) errorHandler.apply(cause); - return result.delegate(); - })); + public StyxObservable onError(Function> errorHandler) { + return new StyxCoreObservable<>(delegate.onErrorResumeNext(cause -> + toObservable(errorHandler.apply(cause)))); } public Observable delegate() { @@ -103,4 +83,21 @@ private static CompletableFuture fromSingleObservable(Observable obser return future; } + private static Observable toObservable(StyxObservable styxObservable) { + return styxObservable instanceof StyxCoreObservable + ? ((StyxCoreObservable) styxObservable).delegate + : toObservable(styxObservable.asCompletableFuture()); + } + + private static Observable toObservable(CompletionStage future) { + return Observable.create(subscriber -> + future.whenComplete((result, error) -> { + if (error != null) { + subscriber.onError(error); + } else { + subscriber.onNext(result); + subscriber.onCompleted(); + } + })); + } } diff --git a/components/api/src/main/java/com/hotels/styx/api/StyxObservable.java b/components/api/src/main/java/com/hotels/styx/api/StyxObservable.java index 837c1ea3ec..b592ae850d 100644 --- a/components/api/src/main/java/com/hotels/styx/api/StyxObservable.java +++ b/components/api/src/main/java/com/hotels/styx/api/StyxObservable.java @@ -27,16 +27,16 @@ *

* This interface provides a is *not* intended for plugins to extend. * - * @param + * @param type of object published */ public interface StyxObservable { - StyxObservable map(Function transformation); + StyxObservable map(Function transformation); - StyxObservable flatMap(Function> transformation); + StyxObservable flatMap(Function> transformation); - StyxObservable reduce(BiFunction accumulator, U initialValue); + StyxObservable reduce(BiFunction accumulator, R initialValue); - StyxObservable onError(Function> errorHandler); + StyxObservable onError(Function> errorHandler); /** * Converts this observable to a completable future. Note that in order to do this, it must @@ -62,7 +62,7 @@ static StyxObservable of(T value) { return new StyxCoreObservable<>(Observable.just(value)); } - static StyxObservable from(Iterable values) { + static StyxObservable from(Iterable values) { return new StyxCoreObservable<>(Observable.from(values)); } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala index 5c7f239852..c9d4861d49 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala @@ -94,8 +94,12 @@ class AsyncRequestContentDelayPlugin extends PluginAdapter { Thread.sleep(1000) Observable.just(byteBuf) }) - StyxObservable.of(request) + + // This was split apart as it no longer compiles without the type annotation StyxObservable[HttpRequest] + val mapped: StyxObservable[HttpRequest] = StyxObservable.of(request) .map(asJavaFunction((request: HttpRequest) => request.newBuilder().body(fromRxObservable(contentTransformation)).build())) + + mapped .flatMap(asJavaFunction((request: HttpRequest) => chain.proceed(request))) } }