Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

final rx java removal #652

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions components/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@
<artifactId>mockito-core</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions components/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>

<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions components/proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@
<artifactId>styx-server</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
import com.hotels.styx.api.plugins.spi.PluginException;
import com.hotels.styx.common.SimpleCache;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;

import java.util.Map;

import static com.hotels.styx.api.HttpResponseStatus.BAD_REQUEST;
import static com.hotels.styx.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static java.util.Objects.requireNonNull;
import static org.slf4j.LoggerFactory.getLogger;
import static rx.Observable.error;
import static rx.RxReactiveStreams.toObservable;
import static rx.RxReactiveStreams.toPublisher;

/**
* Collects metrics on plugin.
Expand Down Expand Up @@ -86,10 +84,9 @@ public Map<String, HttpHandler> adminInterfaceHandlers() {
public Eventual<LiveHttpResponse> intercept(LiveHttpRequest request, Chain originalChain) {
StatusRecordingChain chain = new StatusRecordingChain(originalChain);
try {
return new Eventual<>(toPublisher(
toObservable(plugin.intercept(request, chain))
return new Eventual<>(Flux.from(plugin.intercept(request, chain))
.doOnNext(response -> recordStatusCode(chain, response))
.onErrorResumeNext(error -> error(recordAndWrapError(chain, error)))));
.onErrorResume(error -> Flux.error(recordAndWrapError(chain, error))));
} catch (Throwable e) {
recordException(e);
return Eventual.error(new PluginException(e, plugin.name()));
Expand Down Expand Up @@ -141,10 +138,9 @@ public Context context() {
@Override
public Eventual<LiveHttpResponse> proceed(LiveHttpRequest request) {
try {
return new Eventual<>(
toPublisher(toObservable(chain.proceed(request))
return new Eventual<>(Flux.from(chain.proceed(request))
.doOnNext(response -> upstreamStatus = response.status())
.doOnError(error -> upstreamException = true)));
.doOnError(error -> upstreamException = true));
} catch (RuntimeException | Error e) {
upstreamException = true;
throw e;
Expand Down
5 changes: 0 additions & 5 deletions components/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@
<artifactId>styx-common</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2019 Expedia Inc.
Copyright (C) 2013-2020 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,7 +37,6 @@
import static io.netty.handler.codec.http.HttpHeaders.setTransferEncodingChunked;
import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;
import static java.util.Objects.requireNonNull;
import static rx.RxReactiveStreams.toObservable;

/**
* Netty HTTP response writer.
Expand Down Expand Up @@ -157,7 +157,7 @@ private void onWriteEmptyLastChunkOutcome(ChannelFuture writeOp) {
return future;
} catch (Throwable cause) {
LOGGER.warn("Failed to convert response headers. response={}, Cause={}", new Object[]{response, cause});
toObservable(response.body()).forEach(it -> Buffers.toByteBuf(it).release());
Flux.from(response.body().drop()).subscribe();
future.completeExceptionally(cause);
return future;
}
Expand Down
5 changes: 0 additions & 5 deletions plugin-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@
<artifactId>styx-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
15 changes: 0 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
<antlr.version>4.5.1-1</antlr.version>
<netty.version>4.1.42.Final</netty.version>
<netty-tcnative.version>2.0.26.Final</netty-tcnative.version>
<rxjava.version>1.1.6</rxjava.version>
<reactive-streams.version>1.0.2</reactive-streams.version>
<reactor.version>3.3.0.RELEASE</reactor.version>
<pcollections.version>3.0.3</pcollections.version>
Expand Down Expand Up @@ -195,7 +194,6 @@
<surefire.skip.tests>false</surefire.skip.tests>

<netty-transport-native-epoll.classifier/>
<rxjava-reactive-streams.version>1.1.1</rxjava-reactive-streams.version>
<mockk.version>1.9.3</mockk.version>
</properties>

Expand Down Expand Up @@ -254,19 +252,6 @@
<classifier>${netty-tcnative.classifier}</classifier>
</dependency>

<!-- RxJava -->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<version>${rxjava-reactive-streams.version}</version>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
Expand Down
10 changes: 0 additions & 10 deletions support/api-testsupport/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@
<scope>compile</scope>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.plugins.spi.Plugin;

import static rx.RxReactiveStreams.toObservable;
import static rx.RxReactiveStreams.toPublisher;
import reactor.core.publisher.Flux;

public class OnCompleteErrorPlugin implements Plugin {

@Override
public Eventual<LiveHttpResponse> intercept(LiveHttpRequest request, Chain chain) {

return new Eventual<>(toPublisher(toObservable(chain.proceed(request))
.doOnCompleted(() -> {
return new Eventual<>(Flux.from(chain.proceed(request))
.doOnComplete(() -> {
if (request.header("Fail_at_onCompleted").isPresent()) {
throw new RuntimeException("foobar");
}
})));
}));
}
}

This file was deleted.