Skip to content

Commit

Permalink
Add support for CompletableFuture for method return types
Browse files Browse the repository at this point in the history
Implements support for `CompletableFuture` on method return types by converting through RxJava `Observable`
  • Loading branch information
wjam committed Jul 8, 2018
1 parent fa30e55 commit e67c8f2
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 3 deletions.
10 changes: 8 additions & 2 deletions hystrix/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ GitHub github = HystrixFeign.builder()
.target(GitHub.class, "https://api.github.com");
```

For asynchronous or reactive use, return `HystrixCommand<YourType>`.
For asynchronous or reactive use, return `HystrixCommand<YourType>` or `CompletableFuture<YourType>`.

For RxJava compatibility, use `rx.Observable<YourType>` or `rx.Single<YourType>`. Rx types are <a href="http://reactivex.io/documentation/observable.html">cold</a>, which means a http call isn't made until there's a subscriber.

Methods that do *not* return [`HystrixCommand`](https://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html), [`rx.Observable`](http://reactivex.io/RxJava/javadoc/rx/Observable.html) or [`rx.Single`] are still wrapped in a `HystrixCommand`, but `execute()` is automatically called for you.
Methods that do *not* return [`HystrixCommand`](https://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html), `CompletableFuture`, [`rx.Observable`](http://reactivex.io/RxJava/javadoc/rx/Observable.html) or `rx.Single` are still wrapped in a `HystrixCommand`, but `execute()` is automatically called for you.

```java
interface YourApi {
Expand All @@ -27,6 +27,9 @@ interface YourApi {
@RequestLine("GET /yourtype/{id}")
Single<YourType> getYourTypeSingle(@Param("id") String id);

@RequestLine("GET /yourtype/{id}")
CompletableFuture<YourType> getYourTypeCompletableFuture(@Param("id") String id);

@RequestLine("GET /yourtype/{id}")
YourType getYourTypeSynchronous(@Param("id") String id);
}
Expand All @@ -46,6 +49,9 @@ api.getYourType("a").queue();
// for synchronous
api.getYourType("a").execute();

// or for a CompletableFuture
api.getYourTypeCompletableFuture("a").thenApply(o -> "b");

// or to apply hystrix to existing feign methods.
api.getYourTypeSynchronous("a");
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.netflix.hystrix.HystrixCommand;
import feign.Contract;
import feign.MethodMetadata;
Expand Down Expand Up @@ -63,6 +64,9 @@ public List<MethodMetadata> parseAndValidatateMetadata(Class<?> targetType) {
} else if (type instanceof ParameterizedType
&& ((ParameterizedType) type).getRawType().equals(Completable.class)) {
metadata.returnType(void.class);
} else if (type instanceof ParameterizedType
&& ((ParameterizedType) type).getRawType().equals(CompletableFuture.class)) {
metadata.returnType(resolveLastTypeParameter(type, CompletableFuture.class));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import feign.InvocationHandlerFactory.MethodHandler;
import feign.Target;
import feign.Util;
Expand Down Expand Up @@ -130,15 +133,21 @@ protected Object getFallback() {
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException e) {
} catch (InvocationTargetException | ExecutionException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
} catch (InterruptedException e) {
// Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
throw new AssertionError(e.getCause());
}
}
};
Expand All @@ -155,6 +164,8 @@ protected Object getFallback() {
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
Expand All @@ -171,6 +182,10 @@ private boolean isReturnsObservable(Method method) {
return Observable.class.isAssignableFrom(method.getReturnType());
}

private boolean isReturnsCompletableFuture(Method method) {
return CompletableFuture.class.isAssignableFrom(method.getReturnType());
}

private boolean isReturnsSingle(Method method) {
return Single.class.isAssignableFrom(method.getReturnType());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright 2012-2018 The Feign Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package feign.hystrix;

import com.netflix.hystrix.HystrixCommand;
import rx.Subscription;
import java.util.concurrent.CompletableFuture;

final class ObservableCompletableFuture<T> extends CompletableFuture<T> {

private final Subscription sub;

ObservableCompletableFuture(final HystrixCommand<T> command) {
this.sub = command.toObservable().single().subscribe(ObservableCompletableFuture.this::complete,
ObservableCompletableFuture.this::completeExceptionally);
}


@Override
public boolean cancel(final boolean b) {
sub.unsubscribe();
return super.cancel(b);
}
}
72 changes: 72 additions & 0 deletions hystrix/src/test/java/feign/hystrix/HystrixBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import feign.FeignException;
import feign.Headers;
import feign.Param;
Expand Down Expand Up @@ -427,6 +431,66 @@ public void rxSingleListFallback() {
assertThat(testSubscriber.getOnNextEvents().get(0)).containsExactly("fallback");
}

@Test
public void completableFutureEmptyBody()
throws InterruptedException, ExecutionException, TimeoutException {
server.enqueue(new MockResponse());

TestInterface api = target();

CompletableFuture<String> completable = api.completableFuture();

assertThat(completable).isNotNull();

completable.get(5, TimeUnit.SECONDS);
}

@Test
public void completableFutureWithBody()
throws InterruptedException, ExecutionException, TimeoutException {
server.enqueue(new MockResponse().setBody("foo"));

TestInterface api = target();

CompletableFuture<String> completable = api.completableFuture();

assertThat(completable).isNotNull();

assertThat(completable.get(5, TimeUnit.SECONDS)).isEqualTo("foo");
}

@Test
public void completableFutureFailWithoutFallback() throws TimeoutException, InterruptedException {
server.enqueue(new MockResponse().setResponseCode(500));

TestInterface api = HystrixFeign.builder()
.target(TestInterface.class, "http://localhost:" + server.getPort());

CompletableFuture<String> completable = api.completableFuture();

assertThat(completable).isNotNull();

try {
completable.get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertThat(e).hasCauseInstanceOf(HystrixRuntimeException.class);
}
}

@Test
public void completableFutureFallback()
throws InterruptedException, ExecutionException, TimeoutException {
server.enqueue(new MockResponse().setResponseCode(500));

TestInterface api = target();

CompletableFuture<String> completable = api.completableFuture();

assertThat(completable).isNotNull();

assertThat(completable.get(5, TimeUnit.SECONDS)).isEqualTo("fallback");
}

@Test
public void rxCompletableEmptyBody() {
server.enqueue(new MockResponse());
Expand Down Expand Up @@ -657,6 +721,9 @@ default HystrixCommand<String> defaultMethodReturningCommand() {

@RequestLine("GET /")
Completable completable();

@RequestLine("GET /")
CompletableFuture<String> completableFuture();
}

class FallbackTestInterface implements TestInterface {
Expand Down Expand Up @@ -742,5 +809,10 @@ public List<String> getList() {
public Completable completable() {
return Completable.complete();
}

@Override
public CompletableFuture<String> completableFuture() {
return CompletableFuture.completedFuture("fallback");
}
}
}

0 comments on commit e67c8f2

Please sign in to comment.