Skip to content

Commit

Permalink
Create executor pool to take over network read/write tasks. (#1227)
Browse files Browse the repository at this point in the history
SSL transmission latency was high in recent performance test, especially in PUT.
The reason is selector.poll() needs to iterate all events and do SSL encryption + network IO for each of them. This increased round trip time dramatically in the queuing system.
The change introduced an executor pool to take over event handling and selector.poll() blocks until events done.
In the performance test, 7MB blobs were sent at 20QPS, PUT latency P95 was decreased to 380ms from 1.4kms with this change.
  • Loading branch information
zzmao authored and cgtz committed Aug 13, 2019
1 parent 89b82fb commit db6d337
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public class NetworkConfig {
@Default("false")
public final boolean networkClientEnableConnectionReplenishment;

/**
* The size of the pool if selector executor pool is employed. When size is 0, executor pool won't be used.
*/
@Config("selector.executor.pool.size")
@Default("4")
public final int selectorExecutorPoolSize;

public NetworkConfig(VerifiableProperties verifiableProperties) {

numIoThreads = verifiableProperties.getIntInRange("num.io.threads", 8, 1, Integer.MAX_VALUE);
Expand All @@ -88,5 +95,7 @@ public NetworkConfig(VerifiableProperties verifiableProperties) {
queuedMaxRequests = verifiableProperties.getIntInRange("queued.max.requests", 500, 1, Integer.MAX_VALUE);
networkClientEnableConnectionReplenishment =
verifiableProperties.getBoolean("network.client.enable.connection.replenishment", false);
selectorExecutorPoolSize =
verifiableProperties.getIntInRange("selector.executor.pool.size", 4, 0, Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public NetworkClientFactory(NetworkMetrics networkMetrics, NetworkConfig network
* @throws IOException if the {@link Selector} could not be instantiated.
*/
public NetworkClient getNetworkClient() throws IOException {
Selector selector = new Selector(networkMetrics, time, sslFactory);
Selector selector = new Selector(networkMetrics, time, sslFactory, networkConfig.selectorExecutorPoolSize);
return new NetworkClient(selector, networkConfig, networkMetrics, maxConnectionsPerPortPlainText,
maxConnectionsPerPortSsl, connectionCheckoutTimeoutMs, time);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ private int readFromSocketChannel() throws IOException {
private boolean flush(ByteBuffer buf) throws IOException {
int remaining = buf.remaining();
if (remaining > 0) {
long startNs = SystemTime.getInstance().nanoseconds();
int written = socketChannel.write(buf);
logger.trace("Flushed {} bytes in {} ns", written, SystemTime.getInstance().nanoseconds() - startNs);
return written >= remaining;
}
return true;
Expand Down Expand Up @@ -531,13 +533,12 @@ public int write(ByteBuffer src) throws IOException {
int written = 0;
while (src.remaining() != 0) {
netWriteBuffer.clear();
long startTimeMs = SystemTime.getInstance().milliseconds();
long startTimeNs = SystemTime.getInstance().nanoseconds();
SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
long encryptionTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs;
logger.trace("SSL encryption time: {} ms for {} bytes", encryptionTimeMs, wrapResult.bytesConsumed());
long encryptionTimeNs = SystemTime.getInstance().nanoseconds() - startTimeNs;
logger.trace("SSL encryption time: {} ns for {} bytes", encryptionTimeNs, wrapResult.bytesConsumed());
if (wrapResult.bytesConsumed() > 0) {
metrics.sslEncryptionTimeInUsPerKB.update(
TimeUnit.MILLISECONDS.toMicros(encryptionTimeMs) * 1024 / wrapResult.bytesConsumed());
metrics.sslEncryptionTimeInUsPerKB.update(encryptionTimeNs / wrapResult.bytesConsumed());
}
netWriteBuffer.flip();
//handle ssl renegotiation
Expand Down
191 changes: 173 additions & 18 deletions ambry-network/src/main/java/com.github.ambry.network/Selector.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,11 +85,12 @@ public class Selector implements Selectable {
private final AtomicLong idGenerator;
private final AtomicLong numActiveConnections;
private final SSLFactory sslFactory;
private final ExecutorService executorPool;

/**
* Create a new selector
*/
public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) throws IOException {
public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory, int executorPoolSize) throws IOException {
this.nioSelector = java.nio.channels.Selector.open();
this.time = time;
this.keyMap = new HashMap<>();
Expand All @@ -99,6 +104,11 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) throws
idGenerator = new AtomicLong(0);
numActiveConnections = new AtomicLong(0);
unreadyConnections = new HashSet<>();
if (executorPoolSize > 0) {
executorPool = Executors.newFixedThreadPool(executorPoolSize);
} else {
executorPool = null;
}
metrics.registerSelectorActiveConnections(numActiveConnections);
metrics.registerSelectorUnreadyConnections(unreadyConnections);
}
Expand Down Expand Up @@ -291,13 +301,24 @@ public void poll(long timeoutMs) throws IOException {
* completed I/O.
*
* @param timeoutMs The amount of time to wait, in milliseconds. If negative, wait indefinitely.
* @param sends The list of new sends to begin
* @param sends The list of new sends to initiate.
*
* @throws IOException If a send is given for which we have no existing connection or for which there is
* already an in-progress send
*/
@Override
public void poll(long timeoutMs, List<NetworkSend> sends) throws IOException {
if (executorPool != null) {
pollWithExecutorPool(timeoutMs, sends);
} else {
pollOnMainThread(timeoutMs, sends);
}
}

/**
* Process read/write events on current thread.
*/
private void pollOnMainThread(long timeoutMs, List<NetworkSend> sends) throws IOException {
clear();

// register for write interest on any new sends
Expand Down Expand Up @@ -340,21 +361,26 @@ public void poll(long timeoutMs, List<NetworkSend> sends) throws IOException {
}

if (key.isReadable() && transmission.ready()) {
read(key, transmission);
NetworkReceive networkReceive = read(key, transmission);
if (networkReceive == null) {
// Exception happened in read.
close(key);
} else if (networkReceive.getReceivedBytes().isReadComplete()) {
this.completedReceives.add(networkReceive);
}
} else if (key.isWritable() && transmission.ready()) {
write(key, transmission);
NetworkSend networkSend = write(key, transmission);
if (networkSend == null) {
// Exception happened in write.
close(key);
} else if (networkSend.getPayload().isSendComplete()) {
this.completedSends.add(networkSend);
}
} else if (!key.isValid()) {
close(key);
}
} catch (IOException e) {
String socketDescription = socketDescription(channel(key));
if (e instanceof EOFException || e instanceof ConnectException) {
metrics.selectorDisconnectedErrorCount.inc();
logger.error("Connection {} disconnected", socketDescription, e);
} else {
metrics.selectorIOErrorCount.inc();
logger.warn("Error in I/O with connection to {}", socketDescription, e);
}
handleReadWriteIOException(e, key);
close(key);
} catch (Exception e) {
metrics.selectorKeyOperationErrorCount.inc();
Expand All @@ -363,7 +389,111 @@ public void poll(long timeoutMs, List<NetworkSend> sends) throws IOException {
}
}
checkUnreadyConnectionsStatus();
this.metrics.selectorIOCount.inc();
this.metrics.selectorIOCount.inc(readyKeys);
this.metrics.selectorIOTime.update(time.milliseconds() - endSelect);
}
disconnected.addAll(closedConnections);
closedConnections.clear();
}

/**
* Use {@link ExecutorService} to process read/write events.
*/
private void pollWithExecutorPool(long timeoutMs, List<NetworkSend> sends) throws IOException {
List<Future<NetworkSend>> completedSendsFutures = new ArrayList<>();
List<Future<NetworkReceive>> completedReceivesFutures = new ArrayList<>();
Set<SelectionKey> readWriteKeySet = new HashSet<>();
clear();

// register for write interest on any new sends
if (sends != null) {
for (NetworkSend networkSend : sends) {
send(networkSend);
}
}

// check ready keys
long startSelect = time.milliseconds();
int readyKeys = select(timeoutMs);
this.metrics.selectorSelectCount.inc();

if (readyKeys > 0) {
long endSelect = time.milliseconds();
this.metrics.selectorSelectTime.update(endSelect - startSelect);
Set<SelectionKey> keys = nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();

Transmission transmission = getTransmission(key);
try {
if (key.isConnectable()) {
transmission.finishConnect();
if (transmission.ready()) {
connected.add(transmission.getConnectionId());
metrics.selectorConnectionCreated.inc();
} else {
unreadyConnections.add(transmission.getConnectionId());
}
}

/* if channel is not ready, finish prepare */
if (transmission.isConnected() && !transmission.ready()) {
transmission.prepare();
continue;
}

if (key.isReadable() && transmission.ready()) {
completedReceivesFutures.add(executorPool.submit(() -> read(key, transmission)));
readWriteKeySet.add(key);
} else if (key.isWritable() && transmission.ready()) {
completedSendsFutures.add(executorPool.submit(() -> write(key, transmission)));
readWriteKeySet.add(key);
} else if (!key.isValid()) {
close(key);
}
} catch (IOException e) {
// handles IOException from transmission.finishConnect() and transmission.prepare()
handleReadWriteIOException(e, key);
close(key);
} catch (Exception e) {
close(key);
metrics.selectorKeyOperationErrorCount.inc();
logger.error("closing key on exception remote host {}", channel(key).socket().getRemoteSocketAddress(), e);
}
}
for (Future<NetworkReceive> future : completedReceivesFutures) {
try {
NetworkReceive networkReceive = future.get();
if (networkReceive != null) {
readWriteKeySet.remove(keyForId(networkReceive.getConnectionId()));
if (networkReceive.getReceivedBytes().isReadComplete()) {
this.completedReceives.add(networkReceive);
}
}
} catch (InterruptedException | ExecutionException e) {
logger.error("Hit Unexpected exception on selector read, ", e);
}
}
for (Future<NetworkSend> future : completedSendsFutures) {
try {
NetworkSend networkSend = future.get();
if (networkSend != null) {
readWriteKeySet.remove(keyForId(networkSend.getConnectionId()));
if (networkSend.getPayload().isSendComplete()) {
this.completedSends.add(networkSend);
}
}
} catch (ExecutionException | InterruptedException e) {
logger.error("Hit Unexpected exception on selector write, ", e);
}
}
for (SelectionKey keyWithError : readWriteKeySet) {
close(keyWithError);
}
checkUnreadyConnectionsStatus();
this.metrics.selectorIOCount.inc(readyKeys);
this.metrics.selectorIOTime.update(time.milliseconds() - endSelect);
}
disconnected.addAll(closedConnections);
Expand Down Expand Up @@ -435,7 +565,6 @@ public List<String> connected() {
* @param connectionId a unique ID for this connection.
* @param key the {@link SelectionKey} used to communicate socket events.
* @param hostname the remote hostname for the connection, used for SSL host verification.
* @param hostname the remote port for the connection, used for SSL host verification.
* @param portType used to select between a plaintext or SSL transmission.
* @param mode for SSL transmissions, whether to operate in client or server mode.
* @return either a {@link Transmission} or {@link SSLTransmission}.
Expand Down Expand Up @@ -544,18 +673,38 @@ private SelectionKey keyForId(String id) {
return this.keyMap.get(id);
}

/**
* Handle read/write IOException
*/
private void handleReadWriteIOException(IOException e, SelectionKey key) {
String socketDescription = socketDescription(channel(key));
if (e instanceof EOFException || e instanceof ConnectException) {
metrics.selectorDisconnectedErrorCount.inc();
logger.error("Connection {} disconnected", socketDescription, e);
} else {
metrics.selectorIOErrorCount.inc();
logger.warn("Error in I/O with connection to {}", socketDescription, e);
}
}

/**
* Process reads from ready sockets
* @return the {@link NetworkReceive} if no IOException during read().
*/
private void read(SelectionKey key, Transmission transmission) throws IOException {
private NetworkReceive read(SelectionKey key, Transmission transmission) {
long startTimeToReadInMs = time.milliseconds();
try {
boolean readComplete = transmission.read();
NetworkReceive networkReceive = transmission.getNetworkReceive();
if (readComplete) {
this.completedReceives.add(transmission.getNetworkReceive());
transmission.onReceiveComplete();
transmission.clearReceive();
}
return networkReceive;
} catch (IOException e) {
// We have key information if we log IOException here.
handleReadWriteIOException(e, key);
return null;
} finally {
long readTime = time.milliseconds() - startTimeToReadInMs;
logger.trace("SocketServer time spent on read per key {} = {}", transmission.getConnectionId(), readTime);
Expand All @@ -564,19 +713,25 @@ private void read(SelectionKey key, Transmission transmission) throws IOExceptio

/**
* Process writes to ready sockets
* @return the {@link NetworkSend} if no IOException during write().
*/
private void write(SelectionKey key, Transmission transmission) throws IOException {
private NetworkSend write(SelectionKey key, Transmission transmission) {
long startTimeToWriteInMs = time.milliseconds();
try {
boolean sendComplete = transmission.write();
NetworkSend networkSend = transmission.getNetworkSend();
if (sendComplete) {
logger.trace("Finished writing, registering for read on connection {}", transmission.getRemoteSocketAddress());
transmission.onSendComplete();
this.completedSends.add(transmission.getNetworkSend());
metrics.sendInFlight.dec();
transmission.clearSend();
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE | SelectionKey.OP_READ);
}
return networkSend;
} catch (IOException e) {
// We have key information if we log IOException here.
handleReadWriteIOException(e, key);
return null;
} finally {
long writeTime = time.milliseconds() - startTimeToWriteInMs;
logger.trace("SocketServer time spent on write per key {} = {}", transmission.getConnectionId(), writeTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SocketServer implements NetworkServer {
private final int sendBufferSize;
private final int recvBufferSize;
private final int maxRequestSize;
private final int selectorExecutorPoolSize;
private final ArrayList<Processor> processors;
private volatile ArrayList<Acceptor> acceptors;
private final SocketRequestResponseChannel requestResponseChannel;
Expand All @@ -72,6 +73,7 @@ public SocketServer(NetworkConfig config, SSLConfig sslConfig, MetricRegistry re
this.sendBufferSize = config.socketSendBufferBytes;
this.recvBufferSize = config.socketReceiveBufferBytes;
this.maxRequestSize = config.socketRequestMaxBytes;
this.selectorExecutorPoolSize = config.selectorExecutorPoolSize;
processors = new ArrayList<Processor>(numProcessorThreads);
requestResponseChannel = new SocketRequestResponseChannel(numProcessorThreads, maxQueuedRequests);
metrics = new ServerNetworkMetrics(requestResponseChannel, registry, processors);
Expand Down Expand Up @@ -149,7 +151,8 @@ private void validatePorts(ArrayList<Port> portList) {
public void start() throws IOException, InterruptedException {
logger.info("Starting {} processor threads", numProcessorThreads);
for (int i = 0; i < numProcessorThreads; i++) {
processors.add(i, new Processor(i, maxRequestSize, requestResponseChannel, metrics, sslFactory));
processors.add(i,
new Processor(i, maxRequestSize, requestResponseChannel, metrics, sslFactory, selectorExecutorPoolSize));
Utils.newThread("ambry-processor-" + port + " " + i, processors.get(i), false).start();
}

Expand Down Expand Up @@ -383,7 +386,6 @@ protected void accept(SelectionKey key, Processor processor) throws SocketExcept
* each of which has its own selectors
*/
class Processor extends AbstractServerThread {
private final int maxRequestSize;
private final SocketRequestResponseChannel channel;
private final int id;
private final Time time;
Expand All @@ -394,12 +396,11 @@ class Processor extends AbstractServerThread {
private static final long pollTimeoutMs = 300;

Processor(int id, int maxRequestSize, RequestResponseChannel channel, ServerNetworkMetrics metrics,
SSLFactory sslFactory) throws IOException {
this.maxRequestSize = maxRequestSize;
SSLFactory sslFactory, int selectorExecutorPoolSize) throws IOException {
this.channel = (SocketRequestResponseChannel) channel;
this.id = id;
this.time = SystemTime.getInstance();
selector = new Selector(metrics, time, sslFactory);
selector = new Selector(metrics, time, sslFactory, selectorExecutorPoolSize);
this.metrics = metrics;
}

Expand Down
Loading

0 comments on commit db6d337

Please sign in to comment.