diff --git a/components/client/pom.xml b/components/client/pom.xml index fddb51318d..92527bc94d 100644 --- a/components/client/pom.xml +++ b/components/client/pom.xml @@ -78,11 +78,6 @@ mockito-core - - io.reactivex - rxjava-reactive-streams - - io.projectreactor reactor-test diff --git a/components/common/pom.xml b/components/common/pom.xml index a658329669..56bff96f36 100644 --- a/components/common/pom.xml +++ b/components/common/pom.xml @@ -32,11 +32,6 @@ reactor-core - - io.reactivex - rxjava - - org.hdrhistogram HdrHistogram diff --git a/components/proxy/pom.xml b/components/proxy/pom.xml index 4ecbeebb30..b3dae866fd 100644 --- a/components/proxy/pom.xml +++ b/components/proxy/pom.xml @@ -35,11 +35,6 @@ styx-server - - io.reactivex - rxjava-reactive-streams - - io.netty netty-tcnative-boringssl-static diff --git a/components/proxy/src/main/java/com/hotels/styx/proxy/plugin/InstrumentedPlugin.java b/components/proxy/src/main/java/com/hotels/styx/proxy/plugin/InstrumentedPlugin.java index 4fc3669369..b8e54215ba 100644 --- a/components/proxy/src/main/java/com/hotels/styx/proxy/plugin/InstrumentedPlugin.java +++ b/components/proxy/src/main/java/com/hotels/styx/proxy/plugin/InstrumentedPlugin.java @@ -26,6 +26,7 @@ import com.hotels.styx.api.plugins.spi.PluginException; import com.hotels.styx.common.SimpleCache; import org.slf4j.Logger; +import reactor.core.publisher.Flux; import java.util.Map; @@ -33,9 +34,6 @@ import static com.hotels.styx.api.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static java.util.Objects.requireNonNull; import static org.slf4j.LoggerFactory.getLogger; -import static rx.Observable.error; -import static rx.RxReactiveStreams.toObservable; -import static rx.RxReactiveStreams.toPublisher; /** * Collects metrics on plugin. @@ -86,10 +84,9 @@ public Map adminInterfaceHandlers() { public Eventual intercept(LiveHttpRequest request, Chain originalChain) { StatusRecordingChain chain = new StatusRecordingChain(originalChain); try { - return new Eventual<>(toPublisher( - toObservable(plugin.intercept(request, chain)) + return new Eventual<>(Flux.from(plugin.intercept(request, chain)) .doOnNext(response -> recordStatusCode(chain, response)) - .onErrorResumeNext(error -> error(recordAndWrapError(chain, error))))); + .onErrorResume(error -> Flux.error(recordAndWrapError(chain, error)))); } catch (Throwable e) { recordException(e); return Eventual.error(new PluginException(e, plugin.name())); @@ -141,10 +138,9 @@ public Context context() { @Override public Eventual proceed(LiveHttpRequest request) { try { - return new Eventual<>( - toPublisher(toObservable(chain.proceed(request)) + return new Eventual<>(Flux.from(chain.proceed(request)) .doOnNext(response -> upstreamStatus = response.status()) - .doOnError(error -> upstreamException = true))); + .doOnError(error -> upstreamException = true)); } catch (RuntimeException | Error e) { upstreamException = true; throw e; diff --git a/components/server/pom.xml b/components/server/pom.xml index 475ba4d044..83bec5e9d8 100644 --- a/components/server/pom.xml +++ b/components/server/pom.xml @@ -30,11 +30,6 @@ styx-common - - io.reactivex - rxjava-reactive-streams - - io.netty netty-tcnative-boringssl-static diff --git a/components/server/src/main/java/com/hotels/styx/server/netty/WebServerConnectorFactory.java b/components/server/src/main/java/com/hotels/styx/server/netty/WebServerConnectorFactory.java index ea5697eb10..5bc55bdcc7 100644 --- a/components/server/src/main/java/com/hotels/styx/server/netty/WebServerConnectorFactory.java +++ b/components/server/src/main/java/com/hotels/styx/server/netty/WebServerConnectorFactory.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2019 Expedia Inc. + Copyright (C) 2013-2020 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java index 1987ce04a0..4396c1b912 100644 --- a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java +++ b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -36,7 +37,6 @@ import static io.netty.handler.codec.http.HttpHeaders.setTransferEncodingChunked; import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; import static java.util.Objects.requireNonNull; -import static rx.RxReactiveStreams.toObservable; /** * Netty HTTP response writer. @@ -157,7 +157,7 @@ private void onWriteEmptyLastChunkOutcome(ChannelFuture writeOp) { return future; } catch (Throwable cause) { LOGGER.warn("Failed to convert response headers. response={}, Cause={}", new Object[]{response, cause}); - toObservable(response.body()).forEach(it -> Buffers.toByteBuf(it).release()); + Flux.from(response.body().drop()).subscribe(); future.completeExceptionally(cause); return future; } diff --git a/plugin-examples/pom.xml b/plugin-examples/pom.xml index bb5d93778d..691bed92e4 100644 --- a/plugin-examples/pom.xml +++ b/plugin-examples/pom.xml @@ -37,11 +37,6 @@ styx-api provided - - io.reactivex - rxjava - provided - com.fasterxml.jackson.core diff --git a/pom.xml b/pom.xml index c9013ca7d3..313bd52a6e 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,6 @@ 4.5.1-1 4.1.42.Final 2.0.26.Final - 1.1.6 1.0.2 3.3.0.RELEASE 3.0.3 @@ -195,7 +194,6 @@ false - 1.1.1 1.9.3 @@ -254,19 +252,6 @@ ${netty-tcnative.classifier} - - - io.reactivex - rxjava - ${rxjava.version} - - - - io.reactivex - rxjava-reactive-streams - ${rxjava-reactive-streams.version} - - org.reactivestreams reactive-streams diff --git a/support/api-testsupport/pom.xml b/support/api-testsupport/pom.xml index 570b60f13c..aba7dc91e1 100755 --- a/support/api-testsupport/pom.xml +++ b/support/api-testsupport/pom.xml @@ -39,16 +39,6 @@ compile - - io.reactivex - rxjava - - - - io.reactivex - rxjava-reactive-streams - - org.junit.jupiter junit-jupiter diff --git a/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java b/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java index 8b628eb27e..81a562e5f2 100644 --- a/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java +++ b/system-tests/e2e-suite/src/test/java/com/hotels/styx/plugins/OnCompleteErrorPlugin.java @@ -19,20 +19,18 @@ import com.hotels.styx.api.LiveHttpRequest; import com.hotels.styx.api.LiveHttpResponse; import com.hotels.styx.api.plugins.spi.Plugin; - -import static rx.RxReactiveStreams.toObservable; -import static rx.RxReactiveStreams.toPublisher; +import reactor.core.publisher.Flux; public class OnCompleteErrorPlugin implements Plugin { @Override public Eventual intercept(LiveHttpRequest request, Chain chain) { - return new Eventual<>(toPublisher(toObservable(chain.proceed(request)) - .doOnCompleted(() -> { + return new Eventual<>(Flux.from(chain.proceed(request)) + .doOnComplete(() -> { if (request.header("Fail_at_onCompleted").isPresent()) { throw new RuntimeException("foobar"); } - }))); + })); } } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/support/ImplicitScalaRxConversions.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/support/ImplicitScalaRxConversions.scala deleted file mode 100644 index ffb3c900dc..0000000000 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/support/ImplicitScalaRxConversions.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - Copyright (C) 2013-2018 Expedia Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.hotels.styx.support - -object ImplicitScalaRxConversions { - // Necessary type-mangling to make scala Observable[T] to java rx.Observable[T] possible. - // The original toJavaObservable returned a type rx.Observable[_ <: T] which is not compatible - // with required rx.Observable[T]. We whould investigate the root cause if this is a problem - // with plugin interface, or rx.lang.scala toJavaObservable, or something else. - implicit def toJavaObservable[T](s: rx.lang.scala.Observable[T]): rx.Observable[T] = rx.lang.scala.JavaConversions.toJavaObservable(s).asInstanceOf[rx.Observable[T]] -}