diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java b/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java index c55aa5081..37d9c641a 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java @@ -51,6 +51,7 @@ public static void reset(long time, TimeUnit unit) { private static void _reset() { // clear metrics HystrixCommandMetrics.reset(); + HystrixThreadPoolMetrics.reset(); // clear collapsers HystrixCollapser.reset(); // clear circuit breakers diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java index 6338a06c1..4c7b0a8b1 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java @@ -168,8 +168,11 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queue = concurrencyStrategy.getBlockingQueue(properties.maxQueueSize().get()); - this.threadPool = concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue); - this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, threadPool, properties); + this.metrics = HystrixThreadPoolMetrics.getInstance( + threadPoolKey, + concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), + properties); + this.threadPool = metrics.getThreadPool(); this.scheduler = new HystrixContextScheduler(concurrencyStrategy, this); /* strategy: HystrixMetricsPublisherThreadPool */ diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolMetrics.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolMetrics.java index 450a707f8..833af6371 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolMetrics.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolMetrics.java @@ -90,6 +90,14 @@ public static Collection getInstances() { return Collections.unmodifiableCollection(metrics.values()); } + /** + * Clears all state from metrics. If new requests come in instances will be recreated and metrics started from scratch. + * + */ + /* package */ static void reset() { + metrics.clear(); + } + private final HystrixThreadPoolKey threadPoolKey; private final HystrixRollingNumber counter; private final ThreadPoolExecutor threadPool; @@ -102,6 +110,15 @@ private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolE this.counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds(), properties.metricsRollingStatisticalWindowBuckets()); } + /** + * {@link ThreadPoolExecutor} this executor represents. + * + * @return ThreadPoolExecutor + */ + public ThreadPoolExecutor getThreadPool() { + return threadPool; + } + /** * {@link HystrixThreadPoolKey} these metrics represent. * diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java index 9eee5a947..fb1ba3b6d 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java @@ -47,11 +47,7 @@ public class HystrixMetricsPublisherFactory { /** * Get an instance of {@link HystrixMetricsPublisherThreadPool} with the given factory {@link HystrixMetricsPublisher} implementation for each {@link HystrixThreadPool} instance. - * - * @param metricsPublisher - * Implementation of {@link HystrixMetricsPublisher} to use. - *

- * See {@link HystrixMetricsPublisher} class header JavaDocs for precedence of how this is retrieved. + * * @param threadPoolKey * Pass-thru to {@link HystrixMetricsPublisher#getMetricsPublisherForThreadPool} implementation * @param metrics @@ -83,6 +79,23 @@ public static HystrixMetricsPublisherCommand createOrRetrievePublisherForCommand return SINGLETON.getPublisherForCommand(commandKey, commandOwner, metrics, circuitBreaker, properties); } + /** + * Resets the SINGLETON object. + * + */ + /* package */ static void reset() { + SINGLETON = new HystrixMetricsPublisherFactory(); + } + + /** + * Clears all state from publishers. If new requests come in instances will be recreated. + * + */ + /* package */ void _reset() { + commandPublishers.clear(); + threadPoolPublishers.clear(); + } + private final HystrixMetricsPublisher strategy; private HystrixMetricsPublisherFactory() { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java index df8d66076..217f63ba8 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java @@ -1,14 +1,26 @@ package com.netflix.hystrix; -import static org.junit.Assert.*; +import com.netflix.hystrix.HystrixThreadPool.Factory; +import com.netflix.hystrix.strategy.HystrixPlugins; +import com.netflix.hystrix.strategy.HystrixPluginsTest; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactoryTest; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool; +import org.junit.Before; +import org.junit.Test; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.junit.Test; - -import com.netflix.hystrix.HystrixThreadPool.Factory; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.*; public class HystrixThreadPoolTest { + @Before + public void setup() { + Hystrix.reset(); + } @Test public void testShutdown() { @@ -45,4 +57,50 @@ public void testShutdownWithWait() { assertEquals(0, Factory.threadPools.size()); assertTrue(pool.getExecutor().isShutdown()); } + + private static class HystrixMetricsPublisherThreadPoolContainer implements HystrixMetricsPublisherThreadPool { + private final HystrixThreadPoolMetrics hystrixThreadPoolMetrics; + + private HystrixMetricsPublisherThreadPoolContainer(HystrixThreadPoolMetrics hystrixThreadPoolMetrics) { + this.hystrixThreadPoolMetrics = hystrixThreadPoolMetrics; + } + + @Override + public void initialize() { + } + + public HystrixThreadPoolMetrics getHystrixThreadPoolMetrics() { + return hystrixThreadPoolMetrics; + } + } + + @Test + public void ensureThreadPoolInstanceIsTheOneRegisteredWithMetricsPublisherAndThreadPoolCache() throws IllegalAccessException, NoSuchFieldException { + new HystrixPluginsTest().reset(); + HystrixPlugins.getInstance().registerMetricsPublisher(new HystrixMetricsPublisher() { + @Override + public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) { + return new HystrixMetricsPublisherThreadPoolContainer(metrics); + } + }); + new HystrixMetricsPublisherFactoryTest().reset(); + HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("threadPoolFactoryConcurrencyTest"); + HystrixThreadPool poolOne = new HystrixThreadPool.HystrixThreadPoolDefault( + threadPoolKey, HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder()); + HystrixThreadPool poolTwo = new HystrixThreadPool.HystrixThreadPoolDefault( + threadPoolKey, HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder()); + + assertThat(poolOne.getExecutor(), is(poolTwo.getExecutor())); //Now that we get the threadPool from the metrics object, this will always be equal + HystrixMetricsPublisherThreadPoolContainer hystrixMetricsPublisherThreadPool = + (HystrixMetricsPublisherThreadPoolContainer)HystrixMetricsPublisherFactory + .createOrRetrievePublisherForThreadPool(threadPoolKey, null, null); + ThreadPoolExecutor threadPoolExecutor = hystrixMetricsPublisherThreadPool.getHystrixThreadPoolMetrics().getThreadPool(); + + //assert that both HystrixThreadPools share the same ThreadPoolExecutor as the one in HystrixMetricsPublisherThreadPool + assertTrue(threadPoolExecutor.equals(poolOne.getExecutor()) && threadPoolExecutor.equals(poolTwo.getExecutor())); + assertFalse(threadPoolExecutor.isShutdown()); + + //Now the HystrixThreadPool ALWAYS has the same reference to the ThreadPoolExecutor so that it no longer matters which + //wins to be inserted into the HystrixThreadPool.Factory.threadPools cache. + } } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactoryTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactoryTest.java index 352d7479a..ee0e333a5 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactoryTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactoryTest.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; import org.junit.Test; import com.netflix.hystrix.HystrixCircuitBreaker; @@ -17,6 +18,11 @@ import com.netflix.hystrix.HystrixThreadPoolProperties; public class HystrixMetricsPublisherFactoryTest { + @Before + public void reset() { + HystrixMetricsPublisherFactory.reset(); + } + /** * Assert that we only call a publisher once for a given Command or ThreadPool key. */