Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port of #520 to master #526

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 49 additions & 33 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,16 @@ public void call() {
metrics.markSemaphoreRejection();
logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
// retrieve a fallback or throw an exception if no fallback available
getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution").unsafeSubscribe(observer);
getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution").
map(new Func1<R, R>() {

@Override
public R call(R t1) {
// allow transforming the results via the executionHook if the fallback succeeds
return executionHook.onComplete(_this, t1);
}

}).unsafeSubscribe(observer);
}
} else {
// record that we are returning a short-circuited fallback
Expand Down Expand Up @@ -516,12 +525,7 @@ public void call(Subscriber<? super R> s) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
isExecutedInThread.set(true);
getExecutionObservable().map(new Func1<R, R>() {
@Override
public R call(R r) {
return executionHook.onRunSuccess(_self, r);
}
}).unsafeSubscribe(s);
getExecutionObservableWithLifecycle().unsafeSubscribe(s);
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand All @@ -537,12 +541,7 @@ public R call(R r) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
try {
run = getExecutionObservable().map(new Func1<R, R>() {
@Override
public R call(R r) {
return executionHook.onRunSuccess(_self, r);
}
});
run = getExecutionObservableWithLifecycle();
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand Down Expand Up @@ -605,18 +604,6 @@ public Observable<R> call(Throwable t) {
/**
* BadRequest handling
*/
try {
Exception decorated = executionHook.onRunError(_self, (Exception) t);

if (decorated instanceof HystrixBadRequestException) {
t = (HystrixBadRequestException) decorated;
} else {
logger.warn("ExecutionHook.onRunError returned an exception that was not an instance of HystrixBadRequestException so will be ignored.", decorated);
}
} catch (Exception hookException) {
logger.warn("Error calling ExecutionHook.onRunError", hookException);
}

try {
Exception decorated = executionHook.onError(_self, FailureType.BAD_REQUEST_EXCEPTION, (Exception) t);

Expand All @@ -633,21 +620,17 @@ public Observable<R> call(Throwable t) {
*/
return Observable.error(t);
} else {
/**
* All other error handling
*/
try {
e = executionHook.onRunError(_self, e);
} catch (Exception hookException) {
logger.warn("Error calling ExecutionHook.endRunFailure", hookException);
}
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
return Observable.error(e);
}


/**
* All other error handling
*/
logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);

// report failure
Expand Down Expand Up @@ -686,6 +669,39 @@ public R call(R t1) {
return run;
}

private Observable<R> getExecutionObservableWithLifecycle() {
final HystrixInvokable<R> _self = this;

return getExecutionObservable().map(new Func1<R, R>() {
@Override
public R call(R r) {
return executionHook.onRunSuccess(_self, r);
}
}).onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
try {
Throwable wrappedThrowable = executionHook.onRunError(_self, (Exception) t);
return Observable.error(wrappedThrowable);
} catch (Throwable ex) {
logger.warn("Error calling ExecutionHook.onRunError", ex);
return Observable.error(t);
}

}
}).doOnTerminate(new Action0() {
@Override
public void call() {
//If the command timed out, then the calling thread has already walked away so we need
//to handle these markers. Otherwise, the calling thread will perform these for us.
if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();

}
}
});
}

/**
* Execute <code>getFallback()</code> within protection of a semaphore that limits number of concurrent executions.
* <p>
Expand Down
13 changes: 0 additions & 13 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
Expand Down Expand Up @@ -306,15 +302,6 @@ public void call(Subscriber<? super R> s) {
}
}

}).doOnTerminate(new Action0() {
@Override
public void call() {
//If the command timed out, then the calling thread has already walked away so we need
//to handle these markers. Otherwise, the calling thread will perform these for us.
if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
}
});
}

Expand Down
Loading