diff --git a/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java b/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java index d624e85fb9..0692246696 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java @@ -76,6 +76,13 @@ public class NetworkConfig { @Default("false") public final boolean networkClientEnableConnectionReplenishment; + /** + * The size of the pool if selector executor pool is employed. When size is 0, executor pool won't be used. + */ + @Config("selector.executor.pool.size") + @Default("4") + public final int selectorExecutorPoolSize; + public NetworkConfig(VerifiableProperties verifiableProperties) { numIoThreads = verifiableProperties.getIntInRange("num.io.threads", 8, 1, Integer.MAX_VALUE); @@ -88,5 +95,7 @@ public NetworkConfig(VerifiableProperties verifiableProperties) { queuedMaxRequests = verifiableProperties.getIntInRange("queued.max.requests", 500, 1, Integer.MAX_VALUE); networkClientEnableConnectionReplenishment = verifiableProperties.getBoolean("network.client.enable.connection.replenishment", false); + selectorExecutorPoolSize = + verifiableProperties.getIntInRange("selector.executor.pool.size", 4, 0, Integer.MAX_VALUE); } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkClientFactory.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkClientFactory.java index 3085adaebc..47c79f2d93 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkClientFactory.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkClientFactory.java @@ -57,7 +57,7 @@ public NetworkClientFactory(NetworkMetrics networkMetrics, NetworkConfig network * @throws IOException if the {@link Selector} could not be instantiated. */ public NetworkClient getNetworkClient() throws IOException { - Selector selector = new Selector(networkMetrics, time, sslFactory); + Selector selector = new Selector(networkMetrics, time, sslFactory, networkConfig.selectorExecutorPoolSize); return new NetworkClient(selector, networkConfig, networkMetrics, maxConnectionsPerPortPlainText, maxConnectionsPerPortSsl, connectionCheckoutTimeoutMs, time); } diff --git a/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java b/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java index d4867133d0..903a237537 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java @@ -160,7 +160,9 @@ private int readFromSocketChannel() throws IOException { private boolean flush(ByteBuffer buf) throws IOException { int remaining = buf.remaining(); if (remaining > 0) { + long startNs = SystemTime.getInstance().nanoseconds(); int written = socketChannel.write(buf); + logger.trace("Flushed {} bytes in {} ns", written, SystemTime.getInstance().nanoseconds() - startNs); return written >= remaining; } return true; @@ -531,13 +533,12 @@ public int write(ByteBuffer src) throws IOException { int written = 0; while (src.remaining() != 0) { netWriteBuffer.clear(); - long startTimeMs = SystemTime.getInstance().milliseconds(); + long startTimeNs = SystemTime.getInstance().nanoseconds(); SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer); - long encryptionTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs; - logger.trace("SSL encryption time: {} ms for {} bytes", encryptionTimeMs, wrapResult.bytesConsumed()); + long encryptionTimeNs = SystemTime.getInstance().nanoseconds() - startTimeNs; + logger.trace("SSL encryption time: {} ns for {} bytes", encryptionTimeNs, wrapResult.bytesConsumed()); if (wrapResult.bytesConsumed() > 0) { - metrics.sslEncryptionTimeInUsPerKB.update( - TimeUnit.MILLISECONDS.toMicros(encryptionTimeMs) * 1024 / wrapResult.bytesConsumed()); + metrics.sslEncryptionTimeInUsPerKB.update(encryptionTimeNs / wrapResult.bytesConsumed()); } netWriteBuffer.flip(); //handle ssl renegotiation diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 46a740e9b0..75affe0ff3 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -32,6 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,11 +85,12 @@ public class Selector implements Selectable { private final AtomicLong idGenerator; private final AtomicLong numActiveConnections; private final SSLFactory sslFactory; + private final ExecutorService executorPool; /** * Create a new selector */ - public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) throws IOException { + public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory, int executorPoolSize) throws IOException { this.nioSelector = java.nio.channels.Selector.open(); this.time = time; this.keyMap = new HashMap<>(); @@ -99,6 +104,11 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) throws idGenerator = new AtomicLong(0); numActiveConnections = new AtomicLong(0); unreadyConnections = new HashSet<>(); + if (executorPoolSize > 0) { + executorPool = Executors.newFixedThreadPool(executorPoolSize); + } else { + executorPool = null; + } metrics.registerSelectorActiveConnections(numActiveConnections); metrics.registerSelectorUnreadyConnections(unreadyConnections); } @@ -291,13 +301,24 @@ public void poll(long timeoutMs) throws IOException { * completed I/O. * * @param timeoutMs The amount of time to wait, in milliseconds. If negative, wait indefinitely. - * @param sends The list of new sends to begin + * @param sends The list of new sends to initiate. * * @throws IOException If a send is given for which we have no existing connection or for which there is * already an in-progress send */ @Override public void poll(long timeoutMs, List sends) throws IOException { + if (executorPool != null) { + pollWithExecutorPool(timeoutMs, sends); + } else { + pollOnMainThread(timeoutMs, sends); + } + } + + /** + * Process read/write events on current thread. + */ + private void pollOnMainThread(long timeoutMs, List sends) throws IOException { clear(); // register for write interest on any new sends @@ -340,21 +361,26 @@ public void poll(long timeoutMs, List sends) throws IOException { } if (key.isReadable() && transmission.ready()) { - read(key, transmission); + NetworkReceive networkReceive = read(key, transmission); + if (networkReceive == null) { + // Exception happened in read. + close(key); + } else if (networkReceive.getReceivedBytes().isReadComplete()) { + this.completedReceives.add(networkReceive); + } } else if (key.isWritable() && transmission.ready()) { - write(key, transmission); + NetworkSend networkSend = write(key, transmission); + if (networkSend == null) { + // Exception happened in write. + close(key); + } else if (networkSend.getPayload().isSendComplete()) { + this.completedSends.add(networkSend); + } } else if (!key.isValid()) { close(key); } } catch (IOException e) { - String socketDescription = socketDescription(channel(key)); - if (e instanceof EOFException || e instanceof ConnectException) { - metrics.selectorDisconnectedErrorCount.inc(); - logger.error("Connection {} disconnected", socketDescription, e); - } else { - metrics.selectorIOErrorCount.inc(); - logger.warn("Error in I/O with connection to {}", socketDescription, e); - } + handleReadWriteIOException(e, key); close(key); } catch (Exception e) { metrics.selectorKeyOperationErrorCount.inc(); @@ -363,7 +389,111 @@ public void poll(long timeoutMs, List sends) throws IOException { } } checkUnreadyConnectionsStatus(); - this.metrics.selectorIOCount.inc(); + this.metrics.selectorIOCount.inc(readyKeys); + this.metrics.selectorIOTime.update(time.milliseconds() - endSelect); + } + disconnected.addAll(closedConnections); + closedConnections.clear(); + } + + /** + * Use {@link ExecutorService} to process read/write events. + */ + private void pollWithExecutorPool(long timeoutMs, List sends) throws IOException { + List> completedSendsFutures = new ArrayList<>(); + List> completedReceivesFutures = new ArrayList<>(); + Set readWriteKeySet = new HashSet<>(); + clear(); + + // register for write interest on any new sends + if (sends != null) { + for (NetworkSend networkSend : sends) { + send(networkSend); + } + } + + // check ready keys + long startSelect = time.milliseconds(); + int readyKeys = select(timeoutMs); + this.metrics.selectorSelectCount.inc(); + + if (readyKeys > 0) { + long endSelect = time.milliseconds(); + this.metrics.selectorSelectTime.update(endSelect - startSelect); + Set keys = nioSelector.selectedKeys(); + Iterator iter = keys.iterator(); + while (iter.hasNext()) { + SelectionKey key = iter.next(); + iter.remove(); + + Transmission transmission = getTransmission(key); + try { + if (key.isConnectable()) { + transmission.finishConnect(); + if (transmission.ready()) { + connected.add(transmission.getConnectionId()); + metrics.selectorConnectionCreated.inc(); + } else { + unreadyConnections.add(transmission.getConnectionId()); + } + } + + /* if channel is not ready, finish prepare */ + if (transmission.isConnected() && !transmission.ready()) { + transmission.prepare(); + continue; + } + + if (key.isReadable() && transmission.ready()) { + completedReceivesFutures.add(executorPool.submit(() -> read(key, transmission))); + readWriteKeySet.add(key); + } else if (key.isWritable() && transmission.ready()) { + completedSendsFutures.add(executorPool.submit(() -> write(key, transmission))); + readWriteKeySet.add(key); + } else if (!key.isValid()) { + close(key); + } + } catch (IOException e) { + // handles IOException from transmission.finishConnect() and transmission.prepare() + handleReadWriteIOException(e, key); + close(key); + } catch (Exception e) { + close(key); + metrics.selectorKeyOperationErrorCount.inc(); + logger.error("closing key on exception remote host {}", channel(key).socket().getRemoteSocketAddress(), e); + } + } + for (Future future : completedReceivesFutures) { + try { + NetworkReceive networkReceive = future.get(); + if (networkReceive != null) { + readWriteKeySet.remove(keyForId(networkReceive.getConnectionId())); + if (networkReceive.getReceivedBytes().isReadComplete()) { + this.completedReceives.add(networkReceive); + } + } + } catch (InterruptedException | ExecutionException e) { + logger.error("Hit Unexpected exception on selector read, ", e); + } + } + for (Future future : completedSendsFutures) { + try { + NetworkSend networkSend = future.get(); + if (networkSend != null) { + readWriteKeySet.remove(keyForId(networkSend.getConnectionId())); + if (networkSend.getPayload().isSendComplete()) { + this.completedSends.add(networkSend); + } + } + } catch (ExecutionException | InterruptedException e) { + logger.error("Hit Unexpected exception on selector write, ", e); + } + } + for (SelectionKey keyWithError : readWriteKeySet) { + close(keyWithError); + } + checkUnreadyConnectionsStatus(); + this.metrics.selectorIOCount.inc(readyKeys); this.metrics.selectorIOTime.update(time.milliseconds() - endSelect); } disconnected.addAll(closedConnections); @@ -435,7 +565,6 @@ public List connected() { * @param connectionId a unique ID for this connection. * @param key the {@link SelectionKey} used to communicate socket events. * @param hostname the remote hostname for the connection, used for SSL host verification. - * @param hostname the remote port for the connection, used for SSL host verification. * @param portType used to select between a plaintext or SSL transmission. * @param mode for SSL transmissions, whether to operate in client or server mode. * @return either a {@link Transmission} or {@link SSLTransmission}. @@ -544,18 +673,38 @@ private SelectionKey keyForId(String id) { return this.keyMap.get(id); } + /** + * Handle read/write IOException + */ + private void handleReadWriteIOException(IOException e, SelectionKey key) { + String socketDescription = socketDescription(channel(key)); + if (e instanceof EOFException || e instanceof ConnectException) { + metrics.selectorDisconnectedErrorCount.inc(); + logger.error("Connection {} disconnected", socketDescription, e); + } else { + metrics.selectorIOErrorCount.inc(); + logger.warn("Error in I/O with connection to {}", socketDescription, e); + } + } + /** * Process reads from ready sockets + * @return the {@link NetworkReceive} if no IOException during read(). */ - private void read(SelectionKey key, Transmission transmission) throws IOException { + private NetworkReceive read(SelectionKey key, Transmission transmission) { long startTimeToReadInMs = time.milliseconds(); try { boolean readComplete = transmission.read(); + NetworkReceive networkReceive = transmission.getNetworkReceive(); if (readComplete) { - this.completedReceives.add(transmission.getNetworkReceive()); transmission.onReceiveComplete(); transmission.clearReceive(); } + return networkReceive; + } catch (IOException e) { + // We have key information if we log IOException here. + handleReadWriteIOException(e, key); + return null; } finally { long readTime = time.milliseconds() - startTimeToReadInMs; logger.trace("SocketServer time spent on read per key {} = {}", transmission.getConnectionId(), readTime); @@ -564,19 +713,25 @@ private void read(SelectionKey key, Transmission transmission) throws IOExceptio /** * Process writes to ready sockets + * @return the {@link NetworkSend} if no IOException during write(). */ - private void write(SelectionKey key, Transmission transmission) throws IOException { + private NetworkSend write(SelectionKey key, Transmission transmission) { long startTimeToWriteInMs = time.milliseconds(); try { boolean sendComplete = transmission.write(); + NetworkSend networkSend = transmission.getNetworkSend(); if (sendComplete) { logger.trace("Finished writing, registering for read on connection {}", transmission.getRemoteSocketAddress()); transmission.onSendComplete(); - this.completedSends.add(transmission.getNetworkSend()); metrics.sendInFlight.dec(); transmission.clearSend(); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE | SelectionKey.OP_READ); } + return networkSend; + } catch (IOException e) { + // We have key information if we log IOException here. + handleReadWriteIOException(e, key); + return null; } finally { long writeTime = time.milliseconds() - startTimeToWriteInMs; logger.trace("SocketServer time spent on write per key {} = {}", transmission.getConnectionId(), writeTime); diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java b/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java index 73f6cf3141..57f6c72743 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java @@ -56,6 +56,7 @@ public class SocketServer implements NetworkServer { private final int sendBufferSize; private final int recvBufferSize; private final int maxRequestSize; + private final int selectorExecutorPoolSize; private final ArrayList processors; private volatile ArrayList acceptors; private final SocketRequestResponseChannel requestResponseChannel; @@ -72,6 +73,7 @@ public SocketServer(NetworkConfig config, SSLConfig sslConfig, MetricRegistry re this.sendBufferSize = config.socketSendBufferBytes; this.recvBufferSize = config.socketReceiveBufferBytes; this.maxRequestSize = config.socketRequestMaxBytes; + this.selectorExecutorPoolSize = config.selectorExecutorPoolSize; processors = new ArrayList(numProcessorThreads); requestResponseChannel = new SocketRequestResponseChannel(numProcessorThreads, maxQueuedRequests); metrics = new ServerNetworkMetrics(requestResponseChannel, registry, processors); @@ -149,7 +151,8 @@ private void validatePorts(ArrayList portList) { public void start() throws IOException, InterruptedException { logger.info("Starting {} processor threads", numProcessorThreads); for (int i = 0; i < numProcessorThreads; i++) { - processors.add(i, new Processor(i, maxRequestSize, requestResponseChannel, metrics, sslFactory)); + processors.add(i, + new Processor(i, maxRequestSize, requestResponseChannel, metrics, sslFactory, selectorExecutorPoolSize)); Utils.newThread("ambry-processor-" + port + " " + i, processors.get(i), false).start(); } @@ -383,7 +386,6 @@ protected void accept(SelectionKey key, Processor processor) throws SocketExcept * each of which has its own selectors */ class Processor extends AbstractServerThread { - private final int maxRequestSize; private final SocketRequestResponseChannel channel; private final int id; private final Time time; @@ -394,12 +396,11 @@ class Processor extends AbstractServerThread { private static final long pollTimeoutMs = 300; Processor(int id, int maxRequestSize, RequestResponseChannel channel, ServerNetworkMetrics metrics, - SSLFactory sslFactory) throws IOException { - this.maxRequestSize = maxRequestSize; + SSLFactory sslFactory, int selectorExecutorPoolSize) throws IOException { this.channel = (SocketRequestResponseChannel) channel; this.id = id; this.time = SystemTime.getInstance(); - selector = new Selector(metrics, time, sslFactory); + selector = new Selector(metrics, time, sslFactory, selectorExecutorPoolSize); this.metrics = metrics; } diff --git a/ambry-network/src/test/java/com.github.ambry.network/EchoServer.java b/ambry-network/src/test/java/com.github.ambry.network/EchoServer.java index 55a7fd90b7..178c6f6abe 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/EchoServer.java +++ b/ambry-network/src/test/java/com.github.ambry.network/EchoServer.java @@ -85,6 +85,7 @@ public void run() { @Override public void run() { try { + socket.setSoTimeout(3000); DataInputStream input = new DataInputStream(socket.getInputStream()); DataOutputStream output = new DataOutputStream(socket.getOutputStream()); while (socket.isConnected() && !socket.isClosed()) { diff --git a/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java b/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java index c2796312cf..84ff3223f2 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java @@ -332,7 +332,7 @@ public void testConnectionReplenishment() { .collect(Collectors.toList()); // 1 host x 1 port x 3 connections x 100% int warmUpPercentage = 100; - AtomicInteger expectedConnectCalls = new AtomicInteger(warmUpPercentage * 3 /100); + AtomicInteger expectedConnectCalls = new AtomicInteger(warmUpPercentage * 3 / 100); Runnable checkConnectCalls = () -> Assert.assertEquals(expectedConnectCalls.get(), selector.connectCallCount()); networkClient.warmUpConnections(Collections.singletonList(replicaOnSslNode.getDataNodeId()), warmUpPercentage, @@ -647,7 +647,7 @@ class MockSelector extends Selector { * @throws IOException if {@link Selector} throws. */ MockSelector() throws IOException { - super(new NetworkMetrics(new MetricRegistry()), new MockTime(), null); + super(new NetworkMetrics(new MetricRegistry()), new MockTime(), null, 0); super.close(); } diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index adb7ecb043..957866335f 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -26,6 +26,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; @@ -33,11 +34,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static java.util.Arrays.*; import static org.junit.Assert.*; +@RunWith(Parameterized.class) public class SSLSelectorTest { private static final int DEFAULT_SOCKET_BUF_SIZE = 4 * 1024; @@ -46,8 +50,15 @@ public class SSLSelectorTest { private final EchoServer server; private Selector selector; private final File trustStoreFile; + private final int poolSize; - public SSLSelectorTest() throws Exception { + @Parameterized.Parameters + public static List data() { + return Arrays.asList(new Object[][]{{0}, {2}}); + } + + public SSLSelectorTest(int poolSize) throws Exception { + this.poolSize = poolSize; trustStoreFile = File.createTempFile("truststore", ".jks"); SSLConfig sslConfig = new SSLConfig(TestSSLUtils.createSslProps("DC1,DC2,DC3", SSLFactory.Mode.SERVER, trustStoreFile, "server")); @@ -60,7 +71,8 @@ public SSLSelectorTest() throws Exception { applicationBufferSize = clientSSLFactory.createSSLEngine("localhost", server.port, SSLFactory.Mode.CLIENT) .getSession() .getApplicationBufferSize(); - selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), clientSSLFactory); + selector = + new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), clientSSLFactory, poolSize); } @After @@ -163,6 +175,7 @@ public void testNormalOperation() throws Exception { while (responseCount < conns) { // do the i/o selector.poll(0L, sends); + Thread.sleep(100); assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); @@ -327,7 +340,7 @@ private void useCustomBufferSizeSelector(final Integer netReadBufSizeStart, fina selector.close(); NetworkMetrics metrics = new NetworkMetrics(new MetricRegistry()); Time time = SystemTime.getInstance(); - selector = new Selector(metrics, time, clientSSLFactory) { + selector = new Selector(metrics, time, clientSSLFactory, poolSize) { @Override protected Transmission createTransmission(String connectionId, SelectionKey key, String hostname, int port, PortType portType, SSLFactory.Mode mode) throws IOException { diff --git a/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java index 3b29f7f740..8429bbdf2a 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java @@ -19,11 +19,14 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static java.util.Arrays.*; import static org.junit.Assert.*; @@ -32,17 +35,28 @@ /** * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses. */ +@RunWith(Parameterized.class) public class SelectorTest { - private static final int BUFFER_SIZE = 4 * 1024; private EchoServer server; private Selector selector; + private int selectorExecutorPoolSize; + + @Parameterized.Parameters + public static List data() { + return Arrays.asList(new Object[][]{{0}, {2}}); + } + + public SelectorTest(int poolSize) { + selectorExecutorPoolSize = poolSize; + } @Before public void setup() throws Exception { this.server = new EchoServer(18283); this.server.start(); - this.selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), null); + this.selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), null, + selectorExecutorPoolSize); } @After @@ -170,7 +184,7 @@ public void testNormalOperation() throws Exception { // loop until we complete all requests while (responseCount < conns * reqs) { // do the i/o - selector.poll(0L, sends); + selector.poll(100L, sends); assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java index 77c3da907b..aaa33fa66d 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java @@ -55,7 +55,7 @@ class MockSelector extends Selector { * @throws IOException if {@link Selector} throws. */ MockSelector(MockServerLayout serverLayout, AtomicReference state, Time time) throws IOException { - super(new NetworkMetrics(new MetricRegistry()), time, null); + super(new NetworkMetrics(new MetricRegistry()), time, null, 0); // we don't need the actual selector, close it. super.close(); this.serverLayout = serverLayout;