Skip to content

Commit

Permalink
Move the marking of the thread pool completion to the end of the Obse…
Browse files Browse the repository at this point in the history
…rvable chain (Netflix#377)

* Both thread pool metrics and the onThreadComplete execution hook now run later
* Modified behavior of HystrixCommand.isExecutedInThread() to match the Javadoc (Netflix#448)
** Now this returns true iff the Hystrix thread executed the run() method
  • Loading branch information
Matt Jacobs committed Jan 7, 2015
1 parent 8748f51 commit b0808cb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
import rx.subscriptions.CompositeSubscription;

Expand All @@ -54,7 +51,6 @@
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
Expand Down Expand Up @@ -494,7 +490,6 @@ private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final
Observable<R> run = null;
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
isExecutedInThread.set(true);

run = Observable.create(new OnSubscribe<R>() {

Expand All @@ -512,15 +507,8 @@ public void call(Subscriber<? super R> s) {
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
getExecutionObservable().doOnTerminate(new Action0() {

@Override
public void call() {
// TODO is this actually the end of the thread?
threadPool.markThreadCompletion();
executionHook.onThreadComplete(_self);
}
}).unsafeSubscribe(s);
isExecutedInThread.set(true);
getExecutionObservable().unsafeSubscribe(s);
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand Down Expand Up @@ -660,6 +648,10 @@ public void call() {
if (endCurrentThreadExecutingCommand.get() != null) {
endCurrentThreadExecutingCommand.get().call();
}
if (isExecutedInThread.get()) {
threadPool.markThreadCompletion();
executionHook.onThreadComplete(_self);
}
}
}).map(new Func1<R, R>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3583,7 +3583,7 @@ public void testExecutionHookRunFailureWithoutFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());


/* test with queue() */
Expand Down Expand Up @@ -3623,7 +3623,7 @@ public void testExecutionHookRunFailureWithoutFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down Expand Up @@ -3661,7 +3661,7 @@ public void testExecutionHookRunFailureWithFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());


/* test with queue() */
Expand Down Expand Up @@ -3698,7 +3698,7 @@ public void testExecutionHookRunFailureWithFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down Expand Up @@ -3743,7 +3743,7 @@ public void testExecutionHookRunFailureWithFallbackFailure() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());

/* test with queue() */
command = new KnownFailureTestCommandWithFallbackFailure();
Expand Down Expand Up @@ -3782,7 +3782,7 @@ public void testExecutionHookRunFailureWithFallbackFailure() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onRunError - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5745,8 +5745,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedSynchronous
System.out.println("results.observeOnThread.get(): " + results.observeOnThread.get() + " " + Thread.currentThread());
assertTrue(results.observeOnThread.get().equals(Thread.currentThread())); // rejected so we stay on calling thread

// thread isolated so even though we're rejected we mark that it attempted execution in a thread
assertTrue(results.command.isExecutedInThread());
// thread isolated, but rejected, so this is false
assertFalse(results.command.isExecutedInThread());
}

/**
Expand All @@ -5766,8 +5766,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedAsynchronou
assertTrue(results.isContextInitializedObserveOn.get()); // we capture and set the context once the user provided Observable emits
assertTrue(results.observeOnThread.get().getName().startsWith("RxNewThread"));

// thread isolated so even though we're rejected we mark that it attempted execution in a thread
assertTrue(results.command.isExecutedInThread());
// thread isolated, but rejected, so this is false
assertFalse(results.command.isExecutedInThread());
}

/**
Expand All @@ -5785,8 +5785,8 @@ public void testRejectionWithFallbackRequestContextWithThreadIsolatedAsynchronou
assertTrue(results.isContextInitializedObserveOn.get()); // the user scheduler captures context
assertTrue(results.observeOnThread.get().getName().startsWith("RxNewThread")); // the user provided thread/scheduler for getFallback

// thread isolated so even though we're rejected we mark that it attempted execution in a thread
assertTrue(results.command.isExecutedInThread());
// thread isolated, but rejected, so this is false
assertFalse(results.command.isExecutedInThread());
}

/* *************************************** testShortCircuitedWithFallbackRequestContext *********************************** */
Expand Down

0 comments on commit b0808cb

Please sign in to comment.