Skip to content

Commit

Permalink
Maintain connection low watermark in NetworkClient
Browse files Browse the repository at this point in the history
If prewarming is enabled, maintain a minimum number of connections to
each data node. The goal of this change is to allow the frontend to keep
connection pools adequately sized even when servers are restarted or
other disconnections occur.

To track the connection pools that are below the watermark, some of the
connection establishment logic was moved from NetworkClient to
ConnectionTracker.

Upgrade metrics library from codahale to dropwizard as the library was
renamed and dropwizard has the newest updates.
  • Loading branch information
cgtz committed Jun 12, 2019
1 parent 984fdf4 commit bb6ddb6
Show file tree
Hide file tree
Showing 21 changed files with 391 additions and 265 deletions.
2 changes: 1 addition & 1 deletion HEADER
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
52 changes: 0 additions & 52 deletions ambry-api/src/main/java/com.github.ambry/config/MetricsConfig.java

This file was deleted.

7 changes: 6 additions & 1 deletion ambry-api/src/main/java/com.github.ambry/network/Port.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.ambry.network;

/**
* Represents a port containing port number and {@PortType}
* Represents a port containing port number and {@link PortType}
*/
public class Port {
private final int port;
Expand Down Expand Up @@ -51,4 +51,9 @@ public boolean equals(Object o) {
Port p = (Port) o;
return p.port == port && p.type.equals(type);
}

@Override
public int hashCode() {
return Integer.hashCode(port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
package com.github.ambry.cloud;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.github.ambry.clustermap.ClusterAgentsFactory;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.DataNodeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
*/
package com.github.ambry.network;

import com.github.ambry.utils.Pair;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -25,28 +31,27 @@
*/

class ConnectionTracker {
private final HashMap<String, HostPortPoolManager> hostPortToPoolManager;
private final HashMap<String, HostPortPoolManager> connectionIdToPoolManager;
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionTracker.class);
private final HashMap<Pair<String, Port>, HostPortPoolManager> hostPortToPoolManager = new HashMap<>();
private final HashMap<String, HostPortPoolManager> connectionIdToPoolManager = new HashMap<>();
private final HashSet<HostPortPoolManager> poolManagersBelowWatermark = new HashSet<>();
private int totalManagedConnectionsCount = 0;
private final int maxConnectionsPerPortPlainText;
private final int maxConnectionsPerPortSsl;
private int totalManagedConnectionsCount;

/**
* Instantiates a ConnectionTracker
* @param maxConnectionsPerPortPlainText the connection pool limit for plain text connections to a (host, port)
* @param maxConnectionsPerPortSsl the connection pool limit for ssl connections to a (host, port)
*/
ConnectionTracker(int maxConnectionsPerPortPlainText, int maxConnectionsPerPortSsl) {
hostPortToPoolManager = new HashMap<String, HostPortPoolManager>();
connectionIdToPoolManager = new HashMap<String, HostPortPoolManager>();
totalManagedConnectionsCount = 0;
this.maxConnectionsPerPortPlainText = maxConnectionsPerPortPlainText;
this.maxConnectionsPerPortSsl = maxConnectionsPerPortSsl;
}

/**
* Returns true if a new connection may be created for the given hostPort, that is if the number of connections for
* the given hostPort has not reached the pool limit.
* the given (host, port) has not reached the pool limit.
* @param host the host associated with this check.
* @param port the port associated with this check.
* @return true if a new connection may be created, false otherwise.
Expand All @@ -56,21 +61,85 @@ boolean mayCreateNewConnection(String host, Port port) {
}

/**
* Start tracking a new connection id associated with the given host and port. Note that this connection will not
* be made available for checking out until a {@link #checkInConnection(String)} is called on it.
* Configure the connection tracker to keep a specified percentage of connections to this (host, port) ready for use.
* @param host the hostname.
* @param port the port number.
* @param warmUpPercentage percentage of max connections to this (host, port) that should be kept ready for use.
*/
void enableConnectionWarmUp(String host, Port port, int warmUpPercentage) {
HostPortPoolManager hostPortPoolManager = getHostPortPoolManager(host, port);
hostPortPoolManager.setLowWatermark(warmUpPercentage * hostPortPoolManager.poolLimit / 100);
if (!hostPortPoolManager.hasReachedLowWatermark()) {
poolManagersBelowWatermark.add(hostPortPoolManager);
}
}

/**
* For (host, port) pools that are below the connection watermark, initiate new connections to each host until they
* meet
* @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer.
* @return the number of connections initiated.
*/
int replenishConnections(ConnectionFactory connectionFactory) {
int connectionsInitiated = 0;
Iterator<HostPortPoolManager> iter = poolManagersBelowWatermark.iterator();
while (iter.hasNext()) {
HostPortPoolManager poolManager = iter.next();
try {
while (!poolManager.hasReachedLowWatermark()) {
connectAndTrack(connectionFactory, poolManager.host, poolManager.port, false);
connectionsInitiated++;
}
iter.remove();
} catch (IOException e) {
LOGGER.warn("Encountered exception while replenishing connections to {}:{}.", poolManager.host,
poolManager.port.getPort(), e);
}
}
return connectionsInitiated;
}

/**
* Initiate a new connection using the provided {@link ConnectionFactory} and start tracking a new connection id
* associated with the given host and port. Note that this connection will not be made available for checking out
* until a {@link #checkInConnection} is called on it.
* @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer.
* @param host the host to which this connection belongs.
* @param port the port on the host to which this connection belongs.
* @return the connection id of the connection returned by {@link ConnectionFactory#connect}.
*/
String connectAndTrack(ConnectionFactory connectionFactory, String host, Port port) throws IOException {
return connectAndTrack(connectionFactory, host, port, true);
}

/**
* @see #connectAndTrack(ConnectionFactory, String, Port)
* @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer.
* @param host the host to which this connection belongs.
* @param port the port on the host to which this connection belongs.
* @param connId the connection id of the connection.
* @param editPoolManagersBelowWatermark true to allow this method to edit {@link #poolManagersBelowWatermark}.
* Otherwise, removal of hosts that have reached the low watermark will be
* deferred. This must be false for methods that use an {@link Iterator} to
* iterate through {@link #poolManagersBelowWatermark} to avoid
* {@link java.util.ConcurrentModificationException}s.
* @return the connection id of the connection returned by {@link ConnectionFactory#connect}.
*/
void startTrackingInitiatedConnection(String host, Port port, String connId) {
private String connectAndTrack(ConnectionFactory connectionFactory, String host, Port port,
boolean editPoolManagersBelowWatermark) throws IOException {
String connId = connectionFactory.connect(host, port);
HostPortPoolManager hostPortPoolManager = getHostPortPoolManager(host, port);
hostPortPoolManager.incrementPoolCount();
connectionIdToPoolManager.put(connId, hostPortPoolManager);
totalManagedConnectionsCount++;
if (editPoolManagersBelowWatermark && hostPortPoolManager.hasReachedLowWatermark()) {
poolManagersBelowWatermark.remove(hostPortPoolManager);
}
return connId;
}


/**
* Attempts to check out an existing connection to the hostPort provided, or returns null if none available.
* Attempts to check out an existing connection to the (host, port) provided, or returns null if none available.
* @param host The host to connect to.
* @param port The port on the host to connect to.
* @return connectionId, if there is one available to use, null otherwise.
Expand Down Expand Up @@ -104,6 +173,9 @@ void removeConnection(String connectionId) {
}
hostPortPoolManager.removeConnection(connectionId);
totalManagedConnectionsCount--;
if (!hostPortPoolManager.hasReachedLowWatermark()) {
poolManagersBelowWatermark.add(hostPortPoolManager);
}
}

/**
Expand All @@ -115,7 +187,7 @@ int getTotalConnectionsCount() {
}

/**
* Return the total available connections across all hostPortPoolManagers.
* Return the total available connections across all {@link HostPortPoolManager}s.
* @return total established and available connections.
*/
int getAvailableConnectionsCount() {
Expand All @@ -134,54 +206,51 @@ int getAvailableConnectionsCount() {
* @return the HostPortPoolManager for the associated (host, port) pair.
*/
private HostPortPoolManager getHostPortPoolManager(String host, Port port) {
String lookupStr = host + ":" + Integer.toString(port.getPort());
HostPortPoolManager poolManager = hostPortToPoolManager.get(lookupStr);
if (poolManager == null) {
poolManager = new HostPortPoolManager(
port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText);
hostPortToPoolManager.put(lookupStr, poolManager);
}
return poolManager;
return hostPortToPoolManager.computeIfAbsent(new Pair<>(host, port), k -> new HostPortPoolManager(host, port,
port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText));
}

/**
* Returns max number of connections allowed for a plain text port.
* {@link HostPortPoolManager} manages all the connections to a specific (host, port) pair. The
* {@link ConnectionTracker} creates one for every (host, port) pair it knows of.
*/
int getMaxConnectionsPerPortPlainText() {
return maxConnectionsPerPortPlainText;
}

/**
* Returns max number of connections allowed for a ssl port.
*/
int getMaxConnectionsPerPortSsl() {
return maxConnectionsPerPortSsl;
}
/**
* HostPortPoolManager manages all the connections to a specific (host,
* port) pair. The {@link ConnectionTracker} creates one for every (host, port) pair it knows of.
*/
private class HostPortPoolManager {
private final int maxConnectionsToHostPort;
private final LinkedList<String> availableConnections;
private int poolCount;
private static class HostPortPoolManager {
private final LinkedList<String> availableConnections = new LinkedList<>();
private int lowWatermark = 0;
private int poolCount = 0;
final String host;
final Port port;
final int poolLimit;

/**
* Instantiate a HostPortPoolManager
* @param poolLimit the max connections allowed for this hostPort.
* @param host the destination host for this pool.
* @param port the destination port for this pool.
* @param poolLimit the max connections allowed for this (host, port).
*/
HostPortPoolManager(int poolLimit) {
poolCount = 0;
maxConnectionsToHostPort = poolLimit;
availableConnections = new LinkedList<String>();
HostPortPoolManager(String host, Port port, int poolLimit) {
this.host = host;
this.port = port;
this.poolLimit = poolLimit;
}

boolean hasReachedLowWatermark() {
return poolCount >= lowWatermark;
}

/**
* Return true if this manager has reached the pool limit.
* @return true if this manager has reached the pool limit
*/
boolean hasReachedPoolLimit() {
return poolCount == maxConnectionsToHostPort;
return poolCount == poolLimit;
}

/**
* @param lowWatermark the minimum number of connections to this (host, port) to keep ready for use.
*/
public void setLowWatermark(int lowWatermark) {
this.lowWatermark = Math.min(poolLimit, lowWatermark);
}

/**
Expand Down Expand Up @@ -218,11 +287,26 @@ void removeConnection(String connectionId) {
}

/**
* Return the number of available connections to this hostPort
* Return the number of available connections to this (host, port)
* @return number of available connections
*/
int getAvailableConnectionsCount() {
return availableConnections.size();
}
}

/**
* Used to signal to the networking layer to initiate a new connection.
*/
interface ConnectionFactory {
/**
* Initiate a new connection to the given (host, port). This method can return before the connection is ready for
* sending requests. Once it is ready, {@link #checkInConnection} should be called.
* @param host the hostname to connect to.
* @param port the port to connect to.
* @return a unique connection ID to represent the (future) connection.
* @throws IOException if the connection could not be initiated.
*/
String connect(String host, Port port) throws IOException;
}
}
Loading

0 comments on commit bb6ddb6

Please sign in to comment.