Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument circuit breaker in the connection pool #1693

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.histogram.PercentileTimer;
import com.netflix.spectator.api.patterns.PolledMeter;
import com.netflix.zuul.discovery.DiscoveryResult;
import com.netflix.zuul.discovery.DynamicServerResolver;
import com.netflix.zuul.discovery.ResolverResult;
Expand Down Expand Up @@ -62,12 +63,12 @@
public class DefaultClientChannelManager implements ClientChannelManager {
private static final Logger LOG = LoggerFactory.getLogger(DefaultClientChannelManager.class);

public static final String METRIC_PREFIX = "connectionpool";
public static final String METRIC_PREFIX = "connectionpool_";

private final Resolver<DiscoveryResult> dynamicServerResolver;
private final ConnectionPoolConfig connPoolConfig;
private final IClientConfig clientConfig;
private final Registry spectatorRegistry;
private final Registry registry;

private final OriginName originName;

Expand All @@ -89,6 +90,8 @@ public class DefaultClientChannelManager implements ClientChannelManager {
private final Counter connTakenFromPoolIsNotOpen;
private final Counter maxConnsPerHostExceededCounter;
private final Counter closeWrtBusyConnCounter;
private final Counter circuitBreakerClose;

private final PercentileTimer connEstablishTimer;
private final AtomicInteger connsInPool;
private final AtomicInteger connsInUse;
Expand All @@ -100,55 +103,49 @@ public class DefaultClientChannelManager implements ClientChannelManager {

public static final String IDLE_STATE_HANDLER_NAME = "idleStateHandler";

public DefaultClientChannelManager(OriginName originName, IClientConfig clientConfig, Registry spectatorRegistry) {
this(originName, clientConfig, new DynamicServerResolver(clientConfig), spectatorRegistry);
public DefaultClientChannelManager(OriginName originName, IClientConfig clientConfig, Registry registry) {
this(originName, clientConfig, new DynamicServerResolver(clientConfig), registry);
}

public DefaultClientChannelManager(
OriginName originName,
IClientConfig clientConfig,
Resolver<DiscoveryResult> resolver,
Registry spectatorRegistry) {
OriginName originName, IClientConfig clientConfig, Resolver<DiscoveryResult> resolver, Registry registry) {
this.originName = Objects.requireNonNull(originName, "originName");
this.dynamicServerResolver = resolver;

String metricId = originName.getMetricId();

this.clientConfig = clientConfig;
this.spectatorRegistry = spectatorRegistry;
this.registry = registry;
this.perServerPools = new ConcurrentHashMap<>(200);

this.connPoolConfig = new ConnectionPoolConfigImpl(originName, this.clientConfig);

this.createNewConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create", metricId);
this.createConnSucceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_success", metricId);
this.createConnFailedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_fail", metricId);

this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", metricId);
this.closeAbovePoolHighWaterMarkCounter =
SpectatorUtils.newCounter(METRIC_PREFIX + "_closeAbovePoolHighWaterMark", metricId);
this.closeExpiredConnLifetimeCounter =
SpectatorUtils.newCounter(METRIC_PREFIX + "_closeExpiredConnLifetime", metricId);
this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", metricId);
this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", metricId);
this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", metricId);
this.alreadyClosedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_alreadyClosed", metricId);
this.connTakenFromPoolIsNotOpen = SpectatorUtils.newCounter(METRIC_PREFIX + "_fromPoolIsClosed", metricId);
this.maxConnsPerHostExceededCounter =
SpectatorUtils.newCounter(METRIC_PREFIX + "_maxConnsPerHostExceeded", metricId);
this.closeWrtBusyConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeWrtBusyConnCounter", metricId);
this.createNewConnCounter = newCounter("create");
this.createConnSucceededCounter = newCounter("create_success");
this.createConnFailedCounter = newCounter("create_fail");

this.closeConnCounter = newCounter("close");
this.closeAbovePoolHighWaterMarkCounter = newCounter("closeAbovePoolHighWaterMark");
this.closeExpiredConnLifetimeCounter = newCounter("closeExpiredConnLifetime");
this.requestConnCounter = newCounter("request");
this.reuseConnCounter = newCounter("reuse");
this.releaseConnCounter = newCounter("release");
this.alreadyClosedCounter = newCounter("alreadyClosed");
this.connTakenFromPoolIsNotOpen = newCounter("fromPoolIsClosed");
this.maxConnsPerHostExceededCounter = newCounter("maxConnsPerHostExceeded");
this.closeWrtBusyConnCounter = newCounter("closeWrtBusyConnCounter");
this.circuitBreakerClose = newCounter("closeCircuitBreaker");

this.connEstablishTimer = PercentileTimer.get(
spectatorRegistry, spectatorRegistry.createId(METRIC_PREFIX + "_createTiming", "id", metricId));
this.connsInPool = SpectatorUtils.newGauge(METRIC_PREFIX + "_inPool", metricId, new AtomicInteger());
this.connsInUse = SpectatorUtils.newGauge(METRIC_PREFIX + "_inUse", metricId, new AtomicInteger());
registry, registry.createId(METRIC_PREFIX + "createTiming", "id", originName.getMetricId()));
this.connsInPool = newGauge("inPool");
this.connsInUse = newGauge("inUse");
}

@Override
public void init() {
dynamicServerResolver.setListener(new ServerPoolListener());
// Load channel initializer and conn factory.
// We don't do this within the constructor because some subclass may not be initialized until post-construct.
this.channelInitializer = createChannelInitializer(clientConfig, connPoolConfig, spectatorRegistry);
this.channelInitializer = createChannelInitializer(clientConfig, connPoolConfig, registry);
this.clientConnFactory = createNettyClientConnectionFactory(connPoolConfig, channelInitializer);
}

Expand Down Expand Up @@ -242,6 +239,7 @@ public boolean release(final PooledConnection conn) {
LOG.debug(
"[{}] closing conn, server circuit breaker tripped",
conn.getChannel().id());
circuitBreakerClose.increment();
// Don't put conns for currently circuit-tripped servers back into the pool.
conn.setInPool(false);
conn.close();
Expand Down Expand Up @@ -492,4 +490,15 @@ static SocketAddress pickAddressInternal(ResolverResult chosenServer, @Nullable
protected SocketAddress pickAddress(DiscoveryResult chosenServer) {
return pickAddressInternal(chosenServer, connPoolConfig.getOriginName());
}

private AtomicInteger newGauge(String name) {
return PolledMeter.using(registry)
.withName(METRIC_PREFIX + name)
.withTag("id", originName.getMetricId())
.monitorValue(new AtomicInteger());
}

private Counter newCounter(String name) {
return registry.counter(METRIC_PREFIX + name, "id", originName.getMetricId());
}
}