diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index eb54f5a0a..a64afdd8d 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -506,8 +506,8 @@ private Observable getRunObservableDecoratedForMetricsAndErrorHandling(final @Override public void call(Subscriber s) { - executionHook.onRunStart(_self); executionHook.onThreadStart(_self); + executionHook.onRunStart(_self); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic @@ -561,7 +561,6 @@ public void call(Notification n) { @Override public R call(R t1) { - System.out.println("map " + t1); if (!once) { // report success executionResult = executionResult.addEvents(HystrixEventType.SUCCESS); @@ -617,6 +616,17 @@ public Observable call(Throwable t) { logger.warn("Error calling ExecutionHook.onRunError", hookException); } + try { + Exception decorated = executionHook.onError(_self, FailureType.BAD_REQUEST_EXCEPTION, (Exception) t); + + if (decorated instanceof HystrixBadRequestException) { + t = (HystrixBadRequestException) decorated; + } else { + logger.warn("ExecutionHook.onError returned an exception that was not an instance of HystrixBadRequestException so will be ignored.", decorated); + } + } catch (Exception hookException) { + logger.warn("Error calling ExecutionHook.onError", hookException); + } /* * HystrixBadRequestException is treated differently and allowed to propagate without any stats tracking or fallback logic */ @@ -750,15 +760,7 @@ private Observable getFallbackOrThrowException(final HystrixEventType eventTy executionResult = executionResult.addEvents(eventType); final AbstractCommand _cmd = this; - return getFallbackWithProtection().map(new Func1() { - - @Override - public R call(R t1) { - System.out.println(">>>>>>>>>>>> fallback on thread: " + Thread.currentThread()); - return executionHook.onComplete(_cmd, t1); - } - - }).doOnCompleted(new Action0() { + return getFallbackWithProtection().doOnCompleted(new Action0() { @Override public void call() { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/exception/HystrixRuntimeException.java b/hystrix-core/src/main/java/com/netflix/hystrix/exception/HystrixRuntimeException.java index 6c1dd5052..b70a877f2 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/exception/HystrixRuntimeException.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/exception/HystrixRuntimeException.java @@ -32,7 +32,7 @@ public class HystrixRuntimeException extends RuntimeException { private final FailureType failureCause; public static enum FailureType { - COMMAND_EXCEPTION, TIMEOUT, SHORTCIRCUIT, REJECTED_THREAD_EXECUTION, REJECTED_SEMAPHORE_EXECUTION, REJECTED_SEMAPHORE_FALLBACK + BAD_REQUEST_EXCEPTION, COMMAND_EXCEPTION, TIMEOUT, SHORTCIRCUIT, REJECTED_THREAD_EXECUTION, REJECTED_SEMAPHORE_EXECUTION, REJECTED_SEMAPHORE_FALLBACK } public HystrixRuntimeException(FailureType failureCause, Class commandClass, String message, Exception cause, Throwable fallbackException) { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index 3d67997e9..5f2e1736b 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -31,7 +31,6 @@ import rx.Scheduler; import rx.functions.Action1; import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; import com.netflix.config.ConfigurationManager; import com.netflix.hystrix.AbstractCommand.TryableSemaphore; @@ -75,6 +74,10 @@ public void cleanup() { } } + private static void recordHookCall(StringBuilder sequenceRecorder, String methodName) { + sequenceRecorder.append(methodName).append(" - "); + } + /** * Test a successful command execution. */ @@ -3392,6 +3395,9 @@ public void testExecutionHookSuccessfulCommand() { assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onRunSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); + /* test with queue() */ command = new SuccessfulTestCommand(); try { @@ -3424,6 +3430,10 @@ public void testExecutionHookSuccessfulCommand() { // thread execution assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onRunSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); + } /** @@ -3476,6 +3486,9 @@ public void testExecutionHookSuccessfulCommandViaFireAndForget() { // thread execution assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onRunSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3522,6 +3535,10 @@ public void testExecutionHookSuccessfulCommandWithMultipleGetsOnFuture() { // thread execution assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onRunSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); + } /** @@ -3565,6 +3582,10 @@ public void testExecutionHookRunFailureWithoutFallback() { assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); + + /* test with queue() */ command = new UnknownFailureTestCommandWithoutFallback(); try { @@ -3601,6 +3622,8 @@ public void testExecutionHookRunFailureWithoutFallback() { assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3637,6 +3660,10 @@ public void testExecutionHookRunFailureWithFallback() { assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); + + /* test with queue() */ command = new KnownFailureTestCommandWithFallback(new TestCircuitBreaker()); try { @@ -3669,6 +3696,9 @@ public void testExecutionHookRunFailureWithFallback() { // thread execution assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3712,6 +3742,9 @@ public void testExecutionHookRunFailureWithFallbackFailure() { assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); + /* test with queue() */ command = new KnownFailureTestCommandWithFallbackFailure(); try { @@ -3747,6 +3780,9 @@ public void testExecutionHookRunFailureWithFallbackFailure() { // thread execution assertEquals(1, command.builder.executionHook.threadStart.get()); assertEquals(1, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onThreadComplete - onRunError - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3795,6 +3831,10 @@ public void testExecutionHookTimeoutWithoutFallback() { // ignore } assertEquals(1, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); + } /** @@ -3840,6 +3880,9 @@ public void testExecutionHookTimeoutWithFallback() { // ignore } assertEquals(1, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3892,6 +3935,10 @@ public void testExecutionHookRejectedWithFallback() { // thread execution assertEquals(0, command.builder.executionHook.threadStart.get()); assertEquals(0, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); + } /** @@ -3937,6 +3984,9 @@ public void testExecutionHookShortCircuitedWithFallbackViaQueue() { // thread execution assertEquals(0, command.builder.executionHook.threadStart.get()); assertEquals(0, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -3982,6 +4032,9 @@ public void testExecutionHookShortCircuitedWithFallbackViaExecute() { // thread execution assertEquals(0, command.builder.executionHook.threadStart.get()); assertEquals(0, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -4020,6 +4073,9 @@ public void testExecutionHookSuccessfulCommandWithSemaphoreIsolation() { assertEquals(0, command.builder.executionHook.threadStart.get()); assertEquals(0, command.builder.executionHook.threadComplete.get()); + // expected hook execution sequence + assertEquals("onStart - onRunStart - onRunSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); + /* test with queue() */ command = new TestSemaphoreCommand(new TestCircuitBreaker(), 1, 10); try { @@ -4054,6 +4110,9 @@ public void testExecutionHookSuccessfulCommandWithSemaphoreIsolation() { // thread execution assertEquals(0, command.builder.executionHook.threadStart.get()); assertEquals(0, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onRunStart - onRunSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -4102,6 +4161,54 @@ public void testExecutionHookFailureWithSemaphoreIsolation() { // thread execution assertEquals(0, command.builder.executionHook.threadStart.get()); assertEquals(0, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onFallbackStart - onFallbackError - onError - ", command.builder.executionHook.executionSequence.toString()); + } + + /** + * Execution hook on fail with HystrixBadRequest exception + */ + @Test + public void testExecutionHookFailedOnHystrixBadRequestWithSemaphoreIsolation() { + + TestSemaphoreCommandFailWithHystrixBadRequestException command = new TestSemaphoreCommandFailWithHystrixBadRequestException(new TestCircuitBreaker(), 1, 10); + try { + command.execute(); + fail("we expect a failure"); + } catch (Exception e) { + // expected + } + + assertFalse(command.isExecutedInThread()); + + // the run() method should run as we're not short-circuited or rejected + assertEquals(1, command.builder.executionHook.startRun.get()); + // we expect no response from run() + assertNull(command.builder.executionHook.runSuccessResponse); + // we expect an exception + assertNotNull(command.builder.executionHook.runFailureException); + + // the fallback() method should not be run as BadRequestException is handled by immediate propagation + assertEquals(0, command.builder.executionHook.startFallback.get()); + // null since it didn't run + assertNull(command.builder.executionHook.fallbackSuccessResponse); + // null since it didn't run + assertNull(command.builder.executionHook.fallbackFailureException); + + // the execute() method was used + assertEquals(1, command.builder.executionHook.startExecute.get()); + // we should not have a response from execute() + assertNull(command.builder.executionHook.endExecuteSuccessResponse); + // we should have a HystrixBadRequest exception since run() succeeded + assertNotNull(command.builder.executionHook.endExecuteFailureException); + + // thread execution + assertEquals(0, command.builder.executionHook.threadStart.get()); + assertEquals(0, command.builder.executionHook.threadComplete.get()); + + // expected hook execution sequence + assertEquals("onStart - onRunStart - onRunError - onError - ", command.builder.executionHook.executionSequence.toString()); } /** @@ -4421,6 +4528,29 @@ protected Boolean run() { } + private static class KnownHystrixBadRequestFailureTestCommandWithoutFallback extends TestHystrixCommand { + + public KnownHystrixBadRequestFailureTestCommandWithoutFallback(TestCircuitBreaker circuitBreaker) { + super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)); + } + + public KnownHystrixBadRequestFailureTestCommandWithoutFallback(TestCircuitBreaker circuitBreaker, boolean fallbackEnabled) { + super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics) + .setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withFallbackEnabled(fallbackEnabled))); + } + + @Override + protected Boolean run() { + System.out.println("*** simulated failed with HystrixBadRequestException ***"); + throw new HystrixBadRequestException("we failed with a simulated issue"); + } + + @Override + protected Boolean getFallback() { + return false; + } + } + /** * Failed execution - fallback implementation successfully returns value. */ @@ -4846,6 +4976,36 @@ protected Boolean run() { } } + /** + * The run() will take time. No fallback implementation. + */ + private static class TestSemaphoreCommandFailWithHystrixBadRequestException extends TestHystrixCommand { + + private final long executionSleep; + + private TestSemaphoreCommandFailWithHystrixBadRequestException(TestCircuitBreaker circuitBreaker, int executionSemaphoreCount, long executionSleep) { + super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics) + .setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter() + .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) + .withExecutionIsolationSemaphoreMaxConcurrentRequests(executionSemaphoreCount))); + this.executionSleep = executionSleep; + } + + private TestSemaphoreCommandFailWithHystrixBadRequestException(TestCircuitBreaker circuitBreaker, TryableSemaphore semaphore, long executionSleep) { + super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics) + .setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter() + .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)) + .setExecutionSemaphore(semaphore)); + this.executionSleep = executionSleep; + } + + @Override + protected Boolean run() { + System.out.print("*** simulated failed execution ***"); + throw new HystrixBadRequestException("we failed with a simulated issue"); + } + } + /** * Semaphore based command that allows caller to use latches to know when it has started and signal when it * would like the command to finish @@ -5141,11 +5301,13 @@ public String getCollapserPropertiesCacheKey(HystrixCollapserKey collapserKey, c private static class TestExecutionHook extends HystrixCommandExecutionHook { + StringBuilder executionSequence = new StringBuilder(); AtomicInteger startExecute = new AtomicInteger(); @Override public void onStart(HystrixInvokable commandInstance) { super.onStart(commandInstance); + recordHookCall(executionSequence, "onStart"); startExecute.incrementAndGet(); } @@ -5154,6 +5316,7 @@ public void onStart(HystrixInvokable commandInstance) { @Override public T onComplete(HystrixInvokable commandInstance, T response) { endExecuteSuccessResponse = response; + recordHookCall(executionSequence, "onComplete"); return super.onComplete(commandInstance, response); } @@ -5164,6 +5327,7 @@ public T onComplete(HystrixInvokable commandInstance, T response) { public Exception onError(HystrixInvokable commandInstance, FailureType failureType, Exception e) { endExecuteFailureException = e; endExecuteFailureType = failureType; + recordHookCall(executionSequence, "onError"); return super.onError(commandInstance, failureType, e); } @@ -5172,6 +5336,7 @@ public Exception onError(HystrixInvokable commandInstance, FailureType fa @Override public void onRunStart(HystrixInvokable commandInstance) { super.onRunStart(commandInstance); + recordHookCall(executionSequence, "onRunStart"); startRun.incrementAndGet(); } @@ -5180,6 +5345,7 @@ public void onRunStart(HystrixInvokable commandInstance) { @Override public T onRunSuccess(HystrixInvokable commandInstance, T response) { runSuccessResponse = response; + recordHookCall(executionSequence, "onRunSuccess"); return super.onRunSuccess(commandInstance, response); } @@ -5188,6 +5354,7 @@ public T onRunSuccess(HystrixInvokable commandInstance, T response) { @Override public Exception onRunError(HystrixInvokable commandInstance, Exception e) { runFailureException = e; + recordHookCall(executionSequence, "onRunError"); return super.onRunError(commandInstance, e); } @@ -5196,6 +5363,7 @@ public Exception onRunError(HystrixInvokable commandInstance, Exception e @Override public void onFallbackStart(HystrixInvokable commandInstance) { super.onFallbackStart(commandInstance); + recordHookCall(executionSequence, "onFallbackStart"); startFallback.incrementAndGet(); } @@ -5204,6 +5372,7 @@ public void onFallbackStart(HystrixInvokable commandInstance) { @Override public T onFallbackSuccess(HystrixInvokable commandInstance, T response) { fallbackSuccessResponse = response; + recordHookCall(executionSequence, "onFallbackSuccess"); return super.onFallbackSuccess(commandInstance, response); } @@ -5212,6 +5381,7 @@ public T onFallbackSuccess(HystrixInvokable commandInstance, T response) @Override public Exception onFallbackError(HystrixInvokable commandInstance, Exception e) { fallbackFailureException = e; + recordHookCall(executionSequence, "onFallbackError"); return super.onFallbackError(commandInstance, e); } @@ -5220,6 +5390,7 @@ public Exception onFallbackError(HystrixInvokable commandInstance, Except @Override public void onThreadStart(HystrixInvokable commandInstance) { super.onThreadStart(commandInstance); + recordHookCall(executionSequence, "onThreadStart"); threadStart.incrementAndGet(); } @@ -5228,6 +5399,7 @@ public void onThreadStart(HystrixInvokable commandInstance) { @Override public void onThreadComplete(HystrixInvokable commandInstance) { super.onThreadComplete(commandInstance); + recordHookCall(executionSequence, "onThreadComplete"); threadComplete.incrementAndGet(); }