Skip to content

Commit

Permalink
Merge pull request #780 from mattrjacobs/allow-hooks-to-throw-safely
Browse files Browse the repository at this point in the history
More precise solution to #771
  • Loading branch information
mattrjacobs committed Apr 30, 2015
2 parents 89c2569 + 56e5664 commit 57df50b
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 42 deletions.
67 changes: 27 additions & 40 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,8 @@ public void call(Subscriber<? super R> observer) {
metrics.incrementConcurrentExecutionCount();

// mark that we're starting execution on the ExecutionHook
try {
executionHook.onStart(_this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onStart", hookEx);
}
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_this);

/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
Expand Down Expand Up @@ -519,26 +516,21 @@ public void call(Subscriber<? super R> s) {
s.onError(new RuntimeException("timed out before executing run()"));
} else {
// not timed out so execute
try {
executionHook.onThreadStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onThreadStart", hookEx);
}
try {
executionHook.onRunStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onRunStart", hookEx);
}
try {
executionHook.onExecutionStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionStart", hookEx);
}
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
isExecutedInThread.set(true);
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_self);
executionHook.onRunStart(_self);
executionHook.onExecutionStart(_self);
} catch (Throwable ex) {
s.onError(ex);
}
getExecutionObservableWithLifecycle().unsafeSubscribe(s); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
}
}
Expand All @@ -551,19 +543,16 @@ public Boolean call() {
}));
} else {
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
try {
executionHook.onRunStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onRunStart", hookEx);
}
try {
executionHook.onExecutionStart(_self);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onExecutionStart", hookEx);
run = getExecutionObservableWithLifecycle(); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
run = Observable.error(ex);
}
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
run = getExecutionObservableWithLifecycle(); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
}

run = run.doOnEach(new Action1<Notification<? super R>>() {
Expand Down Expand Up @@ -753,19 +742,17 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy

// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
if (isFallbackUserSupplied(this)) {
try {
try {
if (isFallbackUserSupplied(this)) {
executionHook.onFallbackStart(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onFallbackStart", hookEx);
fallbackExecutionChain = getFallbackObservable();
} else {
//same logic as above without the hook invocation
fallbackExecutionChain = getFallbackObservable();
}
}

try {
fallbackExecutionChain = getFallbackObservable();
} catch (Throwable t) {
// getFallback() is user provided and can throw so we catch it and turn it into Observable.error
fallbackExecutionChain = Observable.error(t);
} catch(Throwable ex) {
//If hook or user-fallback throws, then use that as the result of the fallback lookup
fallbackExecutionChain = Observable.error(ex);
}

fallbackExecutionChain = fallbackExecutionChain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -4206,6 +4207,68 @@ protected Integer getFallback() {
assertTrue(1 == new PrimaryCommand(new TestCircuitBreaker()).execute());
}

@Test
public void testOnRunStartHookThrows() {
final AtomicBoolean threadExceptionEncountered = new AtomicBoolean(false);
final AtomicBoolean semaphoreExceptionEncountered = new AtomicBoolean(false);
final AtomicBoolean onThreadStartInvoked = new AtomicBoolean(false);
final AtomicBoolean onThreadCompleteInvoked = new AtomicBoolean(false);

class FailureInjectionHook extends HystrixCommandExecutionHook {
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
throw new HystrixRuntimeException(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION, commandInstance.getClass(), "Injected Failure", null, null);
}

@Override
public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
onThreadStartInvoked.set(true);
super.onThreadStart(commandInstance);
}

@Override
public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
onThreadCompleteInvoked.set(true);
super.onThreadComplete(commandInstance);
}
}

final FailureInjectionHook failureInjectionHook = new FailureInjectionHook();

class FailureInjectedCommand extends TestHystrixCommand<Integer> {
public FailureInjectedCommand(ExecutionIsolationStrategy isolationStrategy) {
super(testPropsBuilder().setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionIsolationStrategy(isolationStrategy)), failureInjectionHook);
}

@Override
protected Integer run() throws Exception {
return 3;
}
}

TestHystrixCommand<Integer> threadCmd = new FailureInjectedCommand(ExecutionIsolationStrategy.THREAD);
try {
int result = threadCmd.execute();
System.out.println("RESULT : " + result);
} catch (Throwable ex) {
ex.printStackTrace();
threadExceptionEncountered.set(true);
}
assertTrue(threadExceptionEncountered.get());
assertTrue(onThreadStartInvoked.get());
assertTrue(onThreadCompleteInvoked.get());

TestHystrixCommand<Integer> semaphoreCmd = new FailureInjectedCommand(ExecutionIsolationStrategy.SEMAPHORE);
try {
int result = semaphoreCmd.execute();
System.out.println("RESULT : " + result);
} catch (Throwable ex) {
ex.printStackTrace();
semaphoreExceptionEncountered.set(true);
}
assertTrue(semaphoreExceptionEncountered.get());
}

/* ******************************************************************************** */
/* ******************************************************************************** */
/* private HystrixCommand class implementations for unit testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.hystrix;

import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;

abstract public class TestHystrixCommand<T> extends HystrixCommand<T> implements AbstractTestHystrixCommand<T> {

private final TestCommandBuilder builder;
Expand All @@ -26,6 +28,13 @@ public TestHystrixCommand(TestCommandBuilder builder) {
this.builder = builder;
}

public TestHystrixCommand(TestCommandBuilder builder, HystrixCommandExecutionHook executionHook) {
super(builder.owner, builder.dependencyKey, builder.threadPoolKey, builder.circuitBreaker, builder.threadPool,
builder.commandPropertiesDefaults, builder.threadPoolPropertiesDefaults, builder.metrics,
builder.fallbackSemaphore, builder.executionSemaphore, TEST_PROPERTIES_FACTORY, executionHook);
this.builder = builder;
}

public TestCommandBuilder getBuilder() {
return builder;
}
Expand Down

0 comments on commit 57df50b

Please sign in to comment.