Skip to content

Commit

Permalink
Upgrade gradle, fix test logging and int test CPU usage (#1235)
Browse files Browse the repository at this point in the history
- Upgrade to gradle 5.2.1. As part of this, I have separated the
  integration tests into their own task so that they can be run
  independently. The "allTest" target will do what the "test" target
  used to do.
- Suppress some especially noisy logs in integration tests. These log
  messages were firing continuously when the node-down scenarios were
  tested.
- Configure MockCluster replication threads to do a 100 ms sleep per
  iteration. This greatly reduces the CPU requirements for running the
  integration tests and allows us to run them on travis-ci.
- Set up travis to run integration tests too.
  • Loading branch information
cgtz authored and jsjtzyy committed Aug 14, 2019
1 parent db6d337 commit 62d752a
Show file tree
Hide file tree
Showing 14 changed files with 426 additions and 312 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: java
dist: trusty
script:
- ./gradlew build && ./gradlew codeCoverageReport
- ./gradlew build codeCoverageReport
sudo: required
jdk:
- oraclejdk8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* A blocking channel that is used to communicate with a server
*/
public class BlockingChannel implements ConnectedChannel {
protected static final Logger logger = LoggerFactory.getLogger(BlockingChannel.class);
protected final String host;
protected final int port;
protected final int readBufferSize;
Expand All @@ -38,7 +39,6 @@ public class BlockingChannel implements ConnectedChannel {
protected InputStream readChannel = null;
protected WritableByteChannel writeChannel = null;
protected Object lock = new Object();
protected Logger logger = LoggerFactory.getLogger(getClass());
private SocketChannel channel = null;

public BlockingChannel(String host, int port, int readBufferSize, int writeBufferSize, int readTimeoutMs,
Expand Down Expand Up @@ -144,4 +144,4 @@ public String getRemoteHost() {
public int getRemotePort() {
return port;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,247 +22,25 @@
import com.github.ambry.config.ConnectionPoolConfig;
import com.github.ambry.config.SSLConfig;
import java.io.IOException;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class BlockingChannelInfo {
private final ArrayBlockingQueue<BlockingChannel> blockingChannelAvailableConnections;
private final ArrayBlockingQueue<BlockingChannel> blockingChannelActiveConnections;
private final AtomicInteger numberOfConnections;
private final ConnectionPoolConfig config;
private final ReadWriteLock rwlock;
private final Object lock;
private final String host;
private final Port port;
private final Logger logger = LoggerFactory.getLogger(getClass());
protected Gauge<Integer> availableConnections;
private Gauge<Integer> activeConnections;
private Gauge<Integer> totalNumberOfConnections;
private int maxConnectionsPerHostPerPort;
private final SSLSocketFactory sslSocketFactory;
private final SSLConfig sslConfig;
private final MetricRegistry registry;

public BlockingChannelInfo(ConnectionPoolConfig config, String host, Port port, MetricRegistry registry,
SSLSocketFactory sslSocketFactory, SSLConfig sslConfig) {
this.config = config;
this.port = port;
this.registry = registry;
if (port.getPortType() == PortType.SSL) {
maxConnectionsPerHostPerPort = config.connectionPoolMaxConnectionsPerPortSSL;
} else {
maxConnectionsPerHostPerPort = config.connectionPoolMaxConnectionsPerPortPlainText;
}
this.blockingChannelAvailableConnections = new ArrayBlockingQueue<BlockingChannel>(maxConnectionsPerHostPerPort);
this.blockingChannelActiveConnections = new ArrayBlockingQueue<BlockingChannel>(maxConnectionsPerHostPerPort);
this.numberOfConnections = new AtomicInteger(0);
this.rwlock = new ReentrantReadWriteLock();
this.lock = new Object();
this.host = host;
this.sslSocketFactory = sslSocketFactory;
this.sslConfig = sslConfig;

availableConnections = blockingChannelAvailableConnections::size;
registry.register(
MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-availableConnections"),
availableConnections);

activeConnections = blockingChannelActiveConnections::size;
registry.register(
MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-activeConnections"),
activeConnections);

totalNumberOfConnections = numberOfConnections::intValue;
registry.register(
MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-totalNumberOfConnections"),
totalNumberOfConnections);

logger.info("Starting blocking channel info for host {} and port {}", host, port.getPort());
}

public void releaseBlockingChannel(BlockingChannel blockingChannel) {
rwlock.readLock().lock();
try {
if (blockingChannelActiveConnections.remove(blockingChannel)) {
blockingChannelAvailableConnections.add(blockingChannel);
logger.trace(
"Adding connection to {}:{} back to pool. Current available connections {} Current active connections {}",
blockingChannel.getRemoteHost(), blockingChannel.getRemotePort(),
blockingChannelAvailableConnections.size(), blockingChannelActiveConnections.size());
} else {
logger.error("Tried to add invalid connection. Channel does not belong in the active queue. Host {} port {}"
+ " channel host {} channel port {}", host, port.getPort(), blockingChannel.getRemoteHost(),
blockingChannel.getRemotePort());
}
} finally {
rwlock.readLock().unlock();
}
}

public BlockingChannel getBlockingChannel(long timeoutInMs)
throws InterruptedException, ConnectionPoolTimeoutException {
rwlock.readLock().lock();
try {
// check if the max connections for this queue has reached or if there are any connections available
// in the available queue. The check in available queue is approximate and it could not have any
// connections when polled. In this case we just depend on an existing connection being placed back in
// the available pool
if (numberOfConnections.get() == maxConnectionsPerHostPerPort || blockingChannelAvailableConnections.size() > 0) {
BlockingChannel channel = blockingChannelAvailableConnections.poll(timeoutInMs, TimeUnit.MILLISECONDS);
if (channel != null) {
blockingChannelActiveConnections.add(channel);
logger.trace("Returning connection to " + channel.getRemoteHost() + ":" + channel.getRemotePort());
return channel;
} else if (numberOfConnections.get() == maxConnectionsPerHostPerPort) {
logger.error("Timed out trying to get a connection for host {} and port {}", host, port.getPort());
throw new ConnectionPoolTimeoutException(
"Could not get a connection to host " + host + " and port " + port.getPort());
}
}
synchronized (lock) {
// if the number of connections created for this host and port is less than the max allowed
// connections, we create a new one and add it to the available queue
if (numberOfConnections.get() < maxConnectionsPerHostPerPort) {
logger.trace("Planning to create a new connection for host {} and port {} ", host, port.getPort());
BlockingChannel channel = getBlockingChannelBasedOnPortType(host, port.getPort());
channel.connect();
numberOfConnections.incrementAndGet();
logger.trace("Created a new connection for host {} and port {}. Number of connections {}", host, port,
numberOfConnections.get());
blockingChannelActiveConnections.add(channel);
return channel;
}
}
BlockingChannel channel = blockingChannelAvailableConnections.poll(timeoutInMs, TimeUnit.MILLISECONDS);
if (channel == null) {
logger.error("Timed out trying to get a connection for host {} and port {}", host, port);
throw new ConnectionPoolTimeoutException(
"Could not get a connection to host " + host + " and port " + port.getPort());
}
blockingChannelActiveConnections.add(channel);
return channel;
} catch (SocketException e) {
logger.error("Socket exception when trying to connect to remote host {} and port {}", host, port.getPort());
throw new ConnectionPoolTimeoutException(
"Socket exception when trying to connect to remote host " + host + " port " + port.getPort(), e);
} catch (IOException e) {
logger.error("IOException when trying to connect to the remote host {} and port {}", host, port.getPort());
throw new ConnectionPoolTimeoutException(
"IOException when trying to connect to remote host " + host + " port " + port.getPort(), e);
} finally {
rwlock.readLock().unlock();
}
}

/**
* Returns BlockingChannel or SSLBlockingChannel depending on whether the port type is PlainText or SSL
* @param host upon which connection has to be established
* @param port upon which connection has to be established
* @return BlockingChannel
*/
private BlockingChannel getBlockingChannelBasedOnPortType(String host, int port) {
BlockingChannel channel = null;
if (this.port.getPortType() == PortType.PLAINTEXT) {
channel = new BlockingChannel(host, port, config.connectionPoolReadBufferSizeBytes,
config.connectionPoolWriteBufferSizeBytes, config.connectionPoolReadTimeoutMs,
config.connectionPoolConnectTimeoutMs);
} else if (this.port.getPortType() == PortType.SSL) {
channel = new SSLBlockingChannel(host, port, registry, config.connectionPoolReadBufferSizeBytes,
config.connectionPoolWriteBufferSizeBytes, config.connectionPoolReadTimeoutMs,
config.connectionPoolConnectTimeoutMs, sslSocketFactory, sslConfig);
}
return channel;
}

public void destroyBlockingChannel(BlockingChannel blockingChannel) {
rwlock.readLock().lock();
try {
boolean changed = blockingChannelActiveConnections.remove(blockingChannel);
if (!changed) {
logger.error("Invalid connection being destroyed. "
+ "Channel does not belong to this queue. queue host {} port {} channel host {} port {}", host,
port.getPort(), blockingChannel.getRemoteHost(), blockingChannel.getRemotePort());
throw new IllegalArgumentException("Invalid connection. Channel does not belong to this queue");
}
blockingChannel.disconnect();
// we ensure we maintain the current count of connections to the host to avoid synchronization across threads
// to create the connection
BlockingChannel channel =
getBlockingChannelBasedOnPortType(blockingChannel.getRemoteHost(), blockingChannel.getRemotePort());
channel.connect();
logger.trace("Destroying connection and adding new connection for host {} port {}", host, port.getPort());
blockingChannelAvailableConnections.add(channel);
} catch (Exception e) {
logger.error("Connection failure to remote host {} and port {} when destroying and recreating the connection",
host, port.getPort());
synchronized (lock) {
// decrement the number of connections to the host and port. we were not able to maintain the count
numberOfConnections.decrementAndGet();
// at this point we are good to clean up the available connections since re-creation failed
do {
BlockingChannel channel = blockingChannelAvailableConnections.poll();
if (channel == null) {
break;
}
channel.disconnect();
numberOfConnections.decrementAndGet();
} while (true);
}
} finally {
rwlock.readLock().unlock();
}
}

/**
* @return the number of connections with this BlockingChannelInfo
*/
public int getNumberOfConnections() {
return this.numberOfConnections.intValue();
}

public void cleanup() {
rwlock.writeLock().lock();
logger.info("Cleaning all active and available connections for host {} and port {}", host, port.getPort());
try {
for (BlockingChannel channel : blockingChannelActiveConnections) {
channel.disconnect();
}
blockingChannelActiveConnections.clear();
for (BlockingChannel channel : blockingChannelAvailableConnections) {
channel.disconnect();
}
blockingChannelAvailableConnections.clear();
numberOfConnections.set(0);
logger.info("Cleaning completed for all active and available connections for host {} and port {}", host,
port.getPort());
} finally {
rwlock.writeLock().unlock();
}
}
}

/**
* A connection pool that uses BlockingChannel as the underlying connection.
* It is responsible for all the connection management. It helps to
* checkout a new connection, checkin an existing connection that has been
* checked out and destroy a connection in the case of an error
*/
public final class BlockingChannelConnectionPool implements ConnectionPool {

private static final Logger logger = LoggerFactory.getLogger(BlockingChannelConnectionPool.class);
private final Map<String, BlockingChannelInfo> connections;
private final ConnectionPoolConfig config;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final MetricRegistry registry;
private final Timer connectionCheckOutTime;
private final Timer connectionCheckInTime;
Expand Down
Loading

0 comments on commit 62d752a

Please sign in to comment.