Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: ConcurrencyStrategy.wrapCallable was not being used on callbacks #190

Merged
merged 1 commit into from
Oct 2, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public Observable<R> call(Throwable e) {
// don't waste overhead if it's the 'immediate' scheduler
// otherwise we'll 'observeOn' and wrap with the HystrixContextScheduler
// to copy state across threads (if threads are involved)
o = o.observeOn(new HystrixContextScheduler(observeOn));
o = o.observeOn(new HystrixContextScheduler(concurrencyStrategy, observeOn));
}

o = o.finallyDo(new Action0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.netflix.hystrix.strategy.concurrency;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

import rx.Scheduler;
import rx.Subscription;
import rx.util.functions.Func2;
Expand All @@ -31,23 +34,53 @@ public class HystrixContextFunc2<T> implements Func2<Scheduler, T, Subscription>

private final Func2<? super Scheduler, ? super T, ? extends Subscription> actual;
private final HystrixRequestContext parentThreadState;
private final Callable<Subscription> c;

/*
* This is a workaround to needing to use Callable<Subscription> but
* needing to pass `Scheduler t1, T t2` into it after construction.
*
* Think of it like sticking t1 and t2 on the stack and then calling the function
* that uses them.
*
* This should all be thread-safe without issues despite multi-step execution
* because this Func2 is only ever executed once by Hystrix and construction will always
* precede `call` being invoked once.
*
*/
private final AtomicReference<Scheduler> t1Holder = new AtomicReference<Scheduler>();
private final AtomicReference<T> t2Holder = new AtomicReference<T>();

public HystrixContextFunc2(Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
public HystrixContextFunc2(final HystrixConcurrencyStrategy concurrencyStrategy, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
this.actual = action;
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();

this.c = concurrencyStrategy.wrapCallable(new Callable<Subscription>() {

@Override
public Subscription call() throws Exception {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Func2 with the state of the parent
return actual.call(t1Holder.get(), t2Holder.get());
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
});
}

@Override
public Subscription call(Scheduler t1, T t2) {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Func2 with the state of the parent
return actual.call(t1, t2);
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
this.t1Holder.set(t1);
this.t2Holder.set(t2);
return c.call();
} catch (Exception e) {
throw new RuntimeException("Failed executing wrapped Func2", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,36 @@
*/
package com.netflix.hystrix.strategy.concurrency;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.util.functions.Func2;

/**
* Wrap a {@link Scheduler} so that scheduled actions are wrapped with {@link HystrixContextFunc2} so that
* Wrap a {@link Scheduler} so that scheduled actions are wrapped with {@link HystrixContextFunc2} so that
* the {@link HystrixRequestContext} is properly copied across threads (if they are used by the {@link Scheduler}).
*/
public class HystrixContextScheduler extends Scheduler {

private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;

public HystrixContextScheduler(Scheduler scheduler) {
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Scheduler scheduler) {
this.actualScheduler = scheduler;
this.concurrencyStrategy = concurrencyStrategy;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
return actualScheduler.schedule(state, new HystrixContextFunc2<T>(action));
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
return actualScheduler.schedule(state, new HystrixContextFunc2<T>(concurrencyStrategy, action));
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
return actualScheduler.schedule(state, new HystrixContextFunc2<T>(action), delayTime, unit);
return actualScheduler.schedule(state, new HystrixContextFunc2<T>(concurrencyStrategy, action), delayTime, unit);
}

}