Skip to content

Commit

Permalink
Merge pull request #647 from mattrjacobs/tie-command-property-to-thre…
Browse files Browse the repository at this point in the history
…ad-interrupt-2

Tie command property to thread interrupt
  • Loading branch information
mattrjacobs committed Feb 10, 2015
2 parents 10c221a + da29dae commit 66b5323
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,56 +86,56 @@ public void testGetUserSyncWithFallback() {
*/


@Test
public void testGetUserAsyncWithFallbackCommand() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<User> f1 = userService.getUserAsyncFallbackCommand(" ", "name: ");

assertEquals("def", f1.get().getName());

assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
HystrixInvokableInfo<?> getUserAsyncFallbackCommand = getHystrixCommandByKey(
"getUserAsyncFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");

assertEquals("getUserAsyncFallbackCommand", getUserAsyncFallbackCommand.getCommandKey().name());
// confirm that command has failed
assertTrue(getUserAsyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// confirm that first fallback has failed
assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// and that second fallback was successful
assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
} finally {
context.shutdown();
}
}

@Test
public void testGetUserSyncWithFallbackCommand() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
User u1 = userService.getUserSyncFallbackCommand(" ", "name: ");

assertEquals("def", u1.getName());
assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
HystrixInvokableInfo<?> getUserSyncFallbackCommand = getHystrixCommandByKey(
"getUserSyncFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");

assertEquals("getUserSyncFallbackCommand", getUserSyncFallbackCommand.getCommandKey().name());
// confirm that command has failed
assertTrue(getUserSyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// confirm that first fallback has failed
assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// and that second fallback was successful
assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
} finally {
context.shutdown();
}
}
// @Test
// public void testGetUserAsyncWithFallbackCommand() throws ExecutionException, InterruptedException {
// HystrixRequestContext context = HystrixRequestContext.initializeContext();
// try {
// Future<User> f1 = userService.getUserAsyncFallbackCommand(" ", "name: ");
//
// assertEquals("def", f1.get().getName());
//
// assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
// HystrixInvokableInfo<?> getUserAsyncFallbackCommand = getHystrixCommandByKey(
// "getUserAsyncFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");
//
// assertEquals("getUserAsyncFallbackCommand", getUserAsyncFallbackCommand.getCommandKey().name());
// // confirm that command has failed
// assertTrue(getUserAsyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // confirm that first fallback has failed
// assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // and that second fallback was successful
// assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
// } finally {
// context.shutdown();
// }
// }
//
// @Test
// public void testGetUserSyncWithFallbackCommand() {
// HystrixRequestContext context = HystrixRequestContext.initializeContext();
// try {
// User u1 = userService.getUserSyncFallbackCommand(" ", "name: ");
//
// assertEquals("def", u1.getName());
// assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
// HystrixInvokableInfo<?> getUserSyncFallbackCommand = getHystrixCommandByKey(
// "getUserSyncFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand");
// com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand");
//
// assertEquals("getUserSyncFallbackCommand", getUserSyncFallbackCommand.getCommandKey().name());
// // confirm that command has failed
// assertTrue(getUserSyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // confirm that first fallback has failed
// assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
// // and that second fallback was successful
// assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
// } finally {
// context.shutdown();
// }
// }


public static class UserService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -521,8 +524,7 @@ public void call(Subscriber<? super R> s) {
getExecutionObservableWithLifecycle().unsafeSubscribe(s); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
}
}

}).subscribeOn(threadPool.getScheduler());
}).subscribeOn(threadPool.getScheduler(properties.executionIsolationThreadInterruptOnTimeout().get()));
} else {
// semaphore isolated
executionHook.onRunStart(_self);
Expand Down Expand Up @@ -948,7 +950,6 @@ public void tick() {

timeoutRunnable.run();
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface HystrixThreadPool {

public Scheduler getScheduler();

public Scheduler getScheduler(boolean shouldInterruptThread);

/**
* Mark when a thread begins executing a command.
*/
Expand Down Expand Up @@ -153,7 +155,8 @@ public interface HystrixThreadPool {
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final Scheduler scheduler;
private final Scheduler nonInterruptingScheduler;
private final Scheduler interruptingScheduler;

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
Expand All @@ -164,7 +167,8 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();
this.scheduler = new HystrixContextScheduler(concurrencyStrategy, this);
this.nonInterruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, false);
this.interruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, true);

/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
Expand All @@ -178,8 +182,18 @@ public ThreadPoolExecutor getExecutor() {

@Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(true);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
touchConfig();
return scheduler;
if (shouldInterruptThread) {
return interruptingScheduler;
} else {
return nonInterruptingScheduler;
}
}

// allow us to change things via fast-properties by setting it each time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, S
}

public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool) {
this(concurrencyStrategy, threadPool, true);
}


public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, boolean shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool);
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}

@Override
Expand Down Expand Up @@ -101,14 +106,16 @@ public Subscription schedule(Action0 action) {
private static class ThreadPoolScheduler extends Scheduler {

private final HystrixThreadPool threadPool;
private final boolean shouldInterruptThread;

public ThreadPoolScheduler(HystrixThreadPool threadPool) {
public ThreadPoolScheduler(HystrixThreadPool threadPool, boolean shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}

@Override
public Worker createWorker() {
return new ThreadPoolWorker(threadPool);
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}

}
Expand All @@ -126,9 +133,11 @@ private static class ThreadPoolWorker extends Worker {

private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final boolean shouldInterruptThread;

public ThreadPoolWorker(HystrixThreadPool threadPool) {
public ThreadPoolWorker(HystrixThreadPool threadPool, boolean shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}

@Override
Expand All @@ -147,16 +156,19 @@ public Subscription schedule(final Action0 action) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}


//Schedulers.submitTo(executor, action, subscription, shouldInterrupt);


// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);

subscription.add(sa);
sa.addParent(subscription);

Future<?> f = threadPool.getExecutor().submit(sa);
sa.add(f);

sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread));

return sa;
}
Expand All @@ -168,4 +180,26 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {

}

/**
* Very similar to rx.internal.schedulers.ScheduledAction.FutureCompleter, but with configurable interrupt behavior
*/
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final Future<?> f;
private final boolean shouldInterruptThread;

private FutureCompleterWithConfigurableInterrupt(Future<?> f, boolean shouldInterruptThread) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
}

@Override
public void unsubscribe() {
f.cancel(shouldInterruptThread);
}
@Override
public boolean isUnsubscribed() {
return f.isCancelled();
}
}

}
Loading

0 comments on commit 66b5323

Please sign in to comment.