From a9d69cc44fd9a374be232bb3f94825feb24e7bed Mon Sep 17 00:00:00 2001 From: Bradley Baetz Date: Tue, 9 Sep 2014 10:28:26 +1000 Subject: [PATCH] Add toObservable() to the common HystrixExecutable interface --- .../com/netflix/hystrix/HystrixCollapser.java | 5 ++ .../com/netflix/hystrix/HystrixCommand.java | 22 +------- .../netflix/hystrix/HystrixExecutable.java | 53 ++++++++++++++++++- .../hystrix/HystrixExecutableBase.java | 6 ++- .../hystrix/HystrixObservableCollapser.java | 5 ++ .../hystrix/HystrixObservableCommand.java | 2 + 6 files changed, 70 insertions(+), 23 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 249e833e9..b1ea2522c 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -308,6 +308,7 @@ protected Collection} that executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through {@link #mapResponseToRequests} * to transform the {@code } into {@code } */ + @Override public Observable observe() { // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject subject = ReplaySubject.create(); @@ -332,6 +333,7 @@ public Observable observe() { * @return {@code Observable} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through * {@link #mapResponseToRequests} to transform the {@code } into {@code } */ + @Override public Observable toObservable() { // when we callback with the data we want to do the work // on a separate thread than the one giving us the callback @@ -348,6 +350,7 @@ public Observable toObservable() { * @return {@code Observable} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through * {@link #mapResponseToRequests} to transform the {@code } into {@code } */ + @Override public Observable toObservable(Scheduler observeOn) { /* try from cache first */ @@ -397,6 +400,7 @@ public Observable toObservable(Scheduler observeOn) { * @throws HystrixRuntimeException * if an error occurs and a fallback cannot be retrieved */ + @Override public ResponseType execute() { try { return queue().get(); @@ -427,6 +431,7 @@ public ResponseType execute() { * @throws HystrixRuntimeException * within an ExecutionException.getCause() (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved */ + @Override public Future queue() { final Observable o = toObservable(); return o.toBlockingObservable().toFuture(); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java index f7ee6a22e..0d409178f 100755 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java @@ -291,6 +291,7 @@ protected String getCacheKey() { * @throws IllegalStateException * if invoked more than once */ + @Override public Observable toObservable() { if (observableCommand.properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) { return toObservable(Schedulers.computation()); @@ -301,26 +302,7 @@ public Observable toObservable() { } } - /** - * A lazy {@link Observable} that will execute the command when subscribed to. - *

- * See https://github.com/Netflix/RxJava/wiki for more information. - * - * @param observeOn - * The {@link Scheduler} to execute callbacks on. - * @return {@code Observable} that lazily executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason. - * @throws HystrixRuntimeException - * if a fallback does not exist - *

- *

    - *
  • via {@code Observer#onError} if a failure occurs
  • - *
  • or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)
  • - *
- * @throws HystrixBadRequestException - * via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure - * @throws IllegalStateException - * if invoked more than once - */ + @Override public Observable toObservable(Scheduler observeOn) { return toObservable(observeOn, true); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutable.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutable.java index 77785e7c7..5c26c42f2 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutable.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutable.java @@ -23,6 +23,7 @@ import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy; import com.netflix.hystrix.exception.HystrixBadRequestException; import com.netflix.hystrix.exception.HystrixRuntimeException; +import rx.Scheduler; /** * Common interface for executables ({@link HystrixCommand} and {@link HystrixCollapser}) so client code can treat them the same and combine in typed collections if desired. @@ -64,7 +65,7 @@ public interface HystrixExecutable { * Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}. *

* This eagerly starts execution of the command the same as {@link #queue()} and {@link #execute()}. - * A lazy {@link Observable} can be obtained from {@link HystrixCommand#toObservable()} or {@link HystrixCollapser#toObservable()}. + * A lazy {@link Observable} can be obtained from {@link #toObservable()}. *

* Callback Scheduling *

@@ -91,4 +92,54 @@ public interface HystrixExecutable { */ public Observable observe(); + /** + * A lazy {@link Observable} that will execute the command when subscribed to. + *

+ * See https://github.com/Netflix/RxJava/wiki for more information. + * + * @param observeOn + * The {@link Scheduler} to execute callbacks on. + * @return {@code Observable} that lazily executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason. + * @throws HystrixRuntimeException + * if a fallback does not exist + *

+ *

    + *
  • via {@code Observer#onError} if a failure occurs
  • + *
  • or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)
  • + *
+ * @throws HystrixBadRequestException + * via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure + * @throws IllegalStateException + * if invoked more than once + */ + public Observable toObservable(Scheduler observeOn); + + /** + * A lazy {@link Observable} that will execute the command when subscribed to. + *

+ * Callback Scheduling + *

+ * The scheduler to use depends on the individual implementation + *

    + *
  • When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.
  • + *
  • When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.
  • + *
+ *

+ * See https://github.com/Netflix/RxJava/wiki for more information. + * + * @return {@code Observable} that lazily executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason. + * + * @throws HystrixRuntimeException + * if a fallback does not exist + *

+ *

    + *
  • via {@code Observer#onError} if a failure occurs
  • + *
  • or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)
  • + *
+ * @throws HystrixBadRequestException + * via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure + * @throws IllegalStateException + * if invoked more than once + */ + public Observable toObservable(); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java index e76b0730f..10ac343b4 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java @@ -291,6 +291,7 @@ protected HystrixExecutableBase(HystrixCommandGroupKey group, HystrixCommandKey * @throws IllegalStateException * if invoked more than once */ + @Override public R execute() { try { return queue().get(); @@ -321,6 +322,7 @@ public R execute() { * @throws IllegalStateException * if invoked more than once */ + @Override public Future queue() { /* * --- Schedulers.immediate() @@ -544,6 +546,7 @@ public R get(long timeout, TimeUnit unit) throws InterruptedException, Execution * @throws IllegalStateException * if invoked more than once */ + @Override public Observable observe() { // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject subject = ReplaySubject.create(); @@ -573,12 +576,11 @@ public Observable observe() { * @throws IllegalStateException * if invoked more than once */ + @Override public Observable toObservable(Scheduler observeOn) { return toObservable(observeOn, true); } - public abstract Observable toObservable(); - protected abstract ObservableCommand toObservable(final Scheduler observeOn, boolean performAsyncTimeout); /** diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java index 49da6a4c7..bfb244d45 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java @@ -319,6 +319,7 @@ protected Collection} that executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through {@link #mapResponseToRequests} * to transform the {@code } into {@code } */ + @Override public Observable observe() { // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject subject = ReplaySubject.create(); @@ -343,6 +344,7 @@ public Observable observe() { * @return {@code Observable} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through * {@link #mapResponseToRequests} to transform the {@code } into {@code } */ + @Override public Observable toObservable() { // when we callback with the data we want to do the work // on a separate thread than the one giving us the callback @@ -359,6 +361,7 @@ public Observable toObservable() { * @return {@code Observable} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through * {@link #mapResponseToRequests} to transform the {@code } into {@code } */ + @Override public Observable toObservable(Scheduler observeOn) { /* try from cache first */ @@ -408,6 +411,7 @@ public Observable toObservable(Scheduler observeOn) { * @throws HystrixRuntimeException * if an error occurs and a fallback cannot be retrieved */ + @Override public ResponseType execute() { try { return queue().get(); @@ -438,6 +442,7 @@ public ResponseType execute() { * @throws HystrixRuntimeException * within an ExecutionException.getCause() (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved */ + @Override public Future queue() { final Observable o = toObservable(); return o.toBlockingObservable().toFuture(); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java index d90159d11..1f479b1a5 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java @@ -246,10 +246,12 @@ protected Observable getFallback() { * @throws IllegalStateException * if invoked more than once */ + @Override public Observable toObservable() { return toObservable(Schedulers.immediate()); } + @Override protected ObservableCommand toObservable(final Scheduler observeOn, final boolean performAsyncTimeout) { /* this is a stateful object so can only be used once */ if (!started.compareAndSet(false, true)) {