Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition where the HystrixMetricsPublisherThreadPool isn't referencing the correct ThreadPoolExecutor instance #270

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static void reset(long time, TimeUnit unit) {
private static void _reset() {
// clear metrics
HystrixCommandMetrics.reset();
HystrixThreadPoolMetrics.reset();
// clear collapsers
HystrixCollapser.reset();
// clear circuit breakers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queue = concurrencyStrategy.getBlockingQueue(properties.maxQueueSize().get());
this.threadPool = concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue);
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, threadPool, properties);
this.metrics = HystrixThreadPoolMetrics.getInstance(
threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();
this.scheduler = new HystrixContextScheduler(concurrencyStrategy, this);

/* strategy: HystrixMetricsPublisherThreadPool */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public static Collection<HystrixThreadPoolMetrics> getInstances() {
return Collections.unmodifiableCollection(metrics.values());
}

/**
* Clears all state from metrics. If new requests come in instances will be recreated and metrics started from scratch.
*
*/
/* package */ static void reset() {
metrics.clear();
}

private final HystrixThreadPoolKey threadPoolKey;
private final HystrixRollingNumber counter;
private final ThreadPoolExecutor threadPool;
Expand All @@ -102,6 +110,15 @@ private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolE
this.counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds(), properties.metricsRollingStatisticalWindowBuckets());
}

/**
* {@link ThreadPoolExecutor} this executor represents.
*
* @return ThreadPoolExecutor
*/
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}

/**
* {@link HystrixThreadPoolKey} these metrics represent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ public class HystrixMetricsPublisherFactory {

/**
* Get an instance of {@link HystrixMetricsPublisherThreadPool} with the given factory {@link HystrixMetricsPublisher} implementation for each {@link HystrixThreadPool} instance.
*
* @param metricsPublisher
* Implementation of {@link HystrixMetricsPublisher} to use.
* <p>
* See {@link HystrixMetricsPublisher} class header JavaDocs for precedence of how this is retrieved.
*
* @param threadPoolKey
* Pass-thru to {@link HystrixMetricsPublisher#getMetricsPublisherForThreadPool} implementation
* @param metrics
Expand Down Expand Up @@ -83,6 +79,23 @@ public static HystrixMetricsPublisherCommand createOrRetrievePublisherForCommand
return SINGLETON.getPublisherForCommand(commandKey, commandOwner, metrics, circuitBreaker, properties);
}

/**
* Resets the SINGLETON object.
*
*/
/* package */ static void reset() {
SINGLETON = new HystrixMetricsPublisherFactory();
}

/**
* Clears all state from publishers. If new requests come in instances will be recreated.
*
*/
/* package */ void _reset() {
commandPublishers.clear();
threadPoolPublishers.clear();
}

private final HystrixMetricsPublisher strategy;

private HystrixMetricsPublisherFactory() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package com.netflix.hystrix;

import static org.junit.Assert.*;
import com.netflix.hystrix.HystrixThreadPool.Factory;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.HystrixPluginsTest;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactoryTest;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import com.netflix.hystrix.HystrixThreadPool.Factory;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;

public class HystrixThreadPoolTest {
@Before
public void setup() {
Hystrix.reset();
}

@Test
public void testShutdown() {
Expand Down Expand Up @@ -45,4 +57,50 @@ public void testShutdownWithWait() {
assertEquals(0, Factory.threadPools.size());
assertTrue(pool.getExecutor().isShutdown());
}

private static class HystrixMetricsPublisherThreadPoolContainer implements HystrixMetricsPublisherThreadPool {
private final HystrixThreadPoolMetrics hystrixThreadPoolMetrics;

private HystrixMetricsPublisherThreadPoolContainer(HystrixThreadPoolMetrics hystrixThreadPoolMetrics) {
this.hystrixThreadPoolMetrics = hystrixThreadPoolMetrics;
}

@Override
public void initialize() {
}

public HystrixThreadPoolMetrics getHystrixThreadPoolMetrics() {
return hystrixThreadPoolMetrics;
}
}

@Test
public void ensureThreadPoolInstanceIsTheOneRegisteredWithMetricsPublisherAndThreadPoolCache() throws IllegalAccessException, NoSuchFieldException {
new HystrixPluginsTest().reset();
HystrixPlugins.getInstance().registerMetricsPublisher(new HystrixMetricsPublisher() {
@Override
public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
return new HystrixMetricsPublisherThreadPoolContainer(metrics);
}
});
new HystrixMetricsPublisherFactoryTest().reset();
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("threadPoolFactoryConcurrencyTest");
HystrixThreadPool poolOne = new HystrixThreadPool.HystrixThreadPoolDefault(
threadPoolKey, HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder());
HystrixThreadPool poolTwo = new HystrixThreadPool.HystrixThreadPoolDefault(
threadPoolKey, HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder());

assertThat(poolOne.getExecutor(), is(poolTwo.getExecutor())); //Now that we get the threadPool from the metrics object, this will always be equal
HystrixMetricsPublisherThreadPoolContainer hystrixMetricsPublisherThreadPool =
(HystrixMetricsPublisherThreadPoolContainer)HystrixMetricsPublisherFactory
.createOrRetrievePublisherForThreadPool(threadPoolKey, null, null);
ThreadPoolExecutor threadPoolExecutor = hystrixMetricsPublisherThreadPool.getHystrixThreadPoolMetrics().getThreadPool();

//assert that both HystrixThreadPools share the same ThreadPoolExecutor as the one in HystrixMetricsPublisherThreadPool
assertTrue(threadPoolExecutor.equals(poolOne.getExecutor()) && threadPoolExecutor.equals(poolTwo.getExecutor()));
assertFalse(threadPoolExecutor.isShutdown());

//Now the HystrixThreadPool ALWAYS has the same reference to the ThreadPoolExecutor so that it no longer matters which
//wins to be inserted into the HystrixThreadPool.Factory.threadPools cache.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;

import com.netflix.hystrix.HystrixCircuitBreaker;
Expand All @@ -17,6 +18,11 @@
import com.netflix.hystrix.HystrixThreadPoolProperties;

public class HystrixMetricsPublisherFactoryTest {
@Before
public void reset() {
HystrixMetricsPublisherFactory.reset();
}

/**
* Assert that we only call a publisher once for a given Command or ThreadPool key.
*/
Expand Down