Skip to content

Commit

Permalink
Merge pull request #1274 from mattrjacobs/wrap-everything-in-defer
Browse files Browse the repository at this point in the history
Make toObservable lazy.
  • Loading branch information
mattrjacobs authored Jul 10, 2016
2 parents 2bea399 + c6eba37 commit c45c208
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
(:use com.netflix.hystrix.core)
(:require [clojure.test :refer [deftest testing is are use-fixtures]])
(:import [com.netflix.hystrix Hystrix HystrixExecutable]
[com.netflix.hystrix.strategy.concurrency HystrixRequestContext]))
[com.netflix.hystrix.strategy.concurrency HystrixRequestContext]
[com.netflix.hystrix.exception HystrixRuntimeException]))

; reset hystrix after each execution, for consistency and sanity
(defn reset-fixture
Expand Down Expand Up @@ -145,9 +146,9 @@
(execute (instantiate (normalize (assoc base-def :run-fn str))
"hello" "-" "world"))))

(testing "throws IllegalStateException if called twice on same instance"
(testing "throws HystrixRuntimeException if called twice on same instance"
(let [instance (instantiate (normalize (assoc base-def :run-fn str)) "hi")]
(is (thrown? IllegalStateException
(is (thrown? HystrixRuntimeException
(execute instance)
(execute instance)))))

Expand All @@ -166,12 +167,6 @@
:command-key :my-command
:run-fn + }]

(testing "throws IllegalStateException if called twice on same instance"
(let [instance (instantiate (normalize base-def))]
(is (thrown? IllegalStateException
(queue instance)
(queue instance)))))

(testing "queues a HystrixCommand"
(is (= "hello-world")
(.get (queue (instantiate (normalize (assoc base-def :run-fn str))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.MultipleAssignmentSubscription;
Expand Down
108 changes: 58 additions & 50 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,33 +361,8 @@ public void call() {
* if invoked more than once
*/
public Observable<R> toObservable() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
}

commandStartTimestamp = System.currentTimeMillis();

if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(this);
}
}

final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
final AbstractCommand<R> _cmd = this;

/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}

//doOnCompleted handler already did all of the SUCCESS work
//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
final Action0 terminateCommandCleanup = new Action0() {
Expand Down Expand Up @@ -458,35 +433,68 @@ public void call() {
}
};

Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}

commandStartTimestamp = System.currentTimeMillis();

Observable<R> afterCache;
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}

// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, this);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();

/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}

Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);


Observable<R> afterCache;

return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}

return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,11 @@ public void testExecutionMultipleTimes() {
// second should fail
command.execute();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
} catch (HystrixRuntimeException e) {
e.printStackTrace();
// we want to get here
}

try {
// queue should also fail
command.queue();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
e.printStackTrace();
// we want to get here
}
assertEquals(0, command.getBuilder().metrics.getCurrentConcurrentExecutionCount());
assertSaneHystrixRequestLog(1);
assertCommandExecutionEvents(command, HystrixEventType.SUCCESS);
Expand Down Expand Up @@ -2070,7 +2062,7 @@ public void testBasicExecutionWorksWithoutRequestVariable() throws Exception {
/**
* Test that if we try and execute a command with a cacheKey without initializing RequestVariable that it gives an error.
*/
@Test(expected = IllegalStateException.class)
@Test(expected = HystrixRuntimeException.class)
public void testCacheKeyExecutionRequiresRequestVariable() throws Exception {
/* force the RequestVariable to not be initialized */
HystrixRequestContext.setContextOnCurrentThread(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,7 @@ private void testObserveMultipleTimes(ExecutionIsolationStrategy isolationStrate
// second should fail
command.observe().toBlocking().single();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
e.printStackTrace();
// we want to get here
}

try {
// queue should also fail
command.observe().toBlocking().toFuture();
fail("we should not allow this ... it breaks the state of request logs");
} catch (IllegalStateException e) {
} catch (HystrixRuntimeException e) {
e.printStackTrace();
// we want to get here
}
Expand Down

0 comments on commit c45c208

Please sign in to comment.