diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 22aced389f8a9..31d871a2ae8dc 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -181,6 +182,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i private final CounterMetric numHandshakes = new CounterMetric(); private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake"; + private final MeanMetric readBytesMetric = new MeanMetric(); + private final MeanMetric transmittedBytesMetric = new MeanMetric(); + public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -300,14 +304,14 @@ protected void doRunInLifecycle() throws Exception { DiscoveryNode node = entry.getKey(); NodeChannels channels = entry.getValue(); for (Channel channel : channels.getChannels()) { - internalSendMessage(channel, pingHeader, new NotifyOnceListener() { + internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) { @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { successfulPings.inc(); } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { if (isOpen(channel)) { logger.debug( (Supplier) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e); @@ -984,9 +988,10 @@ protected void onException(Channel channel, Exception e) { } else if (e instanceof TcpTransport.HttpOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (isOpen(channel)) { - final NotifyOnceListener closeChannel = new NotifyOnceListener() { + BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); + final SendMetricListener closeChannel = new SendMetricListener(message.length()) { @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { try { closeChannels(Collections.singletonList(channel)); } catch (IOException e1) { @@ -995,7 +1000,7 @@ public void innerOnResponse(Channel channel) { } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { try { closeChannels(Collections.singletonList(channel)); } catch (IOException e1) { @@ -1004,7 +1009,7 @@ public void innerOnFailure(Exception e) { } } }; - internalSendMessage(channel, new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)), closeChannel); + internalSendMessage(channel, message, closeChannel); } } else { logger.warn( @@ -1086,7 +1091,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target final TransportRequestOptions finalOptions = options; // this might be called in a different thread SendListener onRequestSent = new SendListener(stream, - () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); + () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions), message.length()); internalSendMessage(targetChannel, message, onRequestSent); addedReleaseListener = true; } finally { @@ -1099,7 +1104,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target /** * sends a message to the given channel, using the given callbacks. */ - private void internalSendMessage(Channel targetChannel, BytesReference message, NotifyOnceListener listener) { + private void internalSendMessage(Channel targetChannel, BytesReference message, SendMetricListener listener) { try { sendMessage(targetChannel, message, listener); } catch (Exception ex) { @@ -1131,9 +1136,10 @@ public void sendErrorResponse(Version nodeVersion, Channel channel, final Except status = TransportStatus.setError(status); final BytesReference bytes = stream.bytes(); final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); + CompositeBytesReference message = new CompositeBytesReference(header, bytes); SendListener onResponseSent = new SendListener(null, - () -> transportServiceAdapter.onResponseSent(requestId, action, error)); - internalSendMessage(channel, new CompositeBytesReference(header, bytes), onResponseSent); + () -> transportServiceAdapter.onResponseSent(requestId, action, error), message.length()); + internalSendMessage(channel, message, onResponseSent); } } @@ -1162,13 +1168,13 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR } threadPool.getThreadContext().writeTo(stream); stream.setVersion(nodeVersion); - BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream); + BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream); final TransportResponseOptions finalOptions = options; // this might be called in a different thread SendListener listener = new SendListener(stream, - () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); - internalSendMessage(channel, reference, listener); + () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions), message.length()); + internalSendMessage(channel, message, listener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { @@ -1324,7 +1330,7 @@ public HttpOnTransportException(StreamInput in) throws IOException { public final void messageReceived(BytesReference reference, Channel channel, String profileName, InetSocketAddress remoteAddress, int messageLengthBytes) throws IOException { final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; - transportServiceAdapter.addBytesReceived(totalMessageSize); + readBytesMetric.inc(totalMessageSize); // we have additional bytes to read, outside of the header boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0; StreamInput streamIn = reference.streamInput(); @@ -1662,22 +1668,42 @@ protected final void ensureOpen() { } } - private final class SendListener extends NotifyOnceListener { + /** + * This listener increments the transmitted bytes metric on success. + */ + private abstract class SendMetricListener extends NotifyOnceListener { + private final long messageSize; + + private SendMetricListener(long messageSize) { + this.messageSize = messageSize; + } + + @Override + protected final void innerOnResponse(T object) { + transmittedBytesMetric.inc(messageSize); + innerInnerOnResponse(object); + } + + protected abstract void innerInnerOnResponse(T object); + } + + private final class SendListener extends SendMetricListener { private final Releasable optionalReleasable; private final Runnable transportAdaptorCallback; - private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback) { + private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) { + super(messageLength); this.optionalReleasable = optionalReleasable; this.transportAdaptorCallback = transportAdaptorCallback; } @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { release(); } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { release(); } @@ -1701,4 +1727,16 @@ final int getNumOpenConnections() { final int getNumConnectedNodes() { return connectedNodes.size(); } + + /** + * Returns count of currently open connections + */ + protected abstract long getNumOpenServerConnections(); + + @Override + public final TransportStats getStats() { + return new TransportStats( + getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), + transmittedBytesMetric.sum()); + } } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index a32289332ead5..5d22e156d9d13 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -75,11 +75,6 @@ void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, */ void disconnectFromNode(DiscoveryNode node); - /** - * Returns count of currently open connections - */ - long serverOpen(); - List getLocalAddresses(); default CircuitBreaker getInFlightRequestBreaker() { @@ -110,6 +105,8 @@ default CircuitBreaker getInFlightRequestBreaker() { */ Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException; + TransportStats getStats(); + /** * A unidirectional connection to a {@link DiscoveryNode} */ diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index e5382e4e2617b..0a4745cda7995 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -203,8 +203,6 @@ void setTracerLogExclude(List tracerLogExclude) { @Override protected void doStart() { - adapter.rxMetric.clear(); - adapter.txMetric.clear(); transport.transportServiceAdapter(adapter); transport.start(); @@ -292,8 +290,7 @@ public TransportInfo info() { } public TransportStats stats() { - return new TransportStats( - transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum()); + return transport.getStats(); } public BoundTransportAddress boundAddress() { @@ -738,19 +735,6 @@ protected RequestHandlerRegistry getRequestHandler(String action) { protected class Adapter implements TransportServiceAdapter { - final MeanMetric rxMetric = new MeanMetric(); - final MeanMetric txMetric = new MeanMetric(); - - @Override - public void addBytesReceived(long size) { - rxMetric.inc(size); - } - - @Override - public void addBytesSent(long size) { - txMetric.inc(size); - } - @Override public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java index 70748b01a6802..24a71a99998a4 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java @@ -23,10 +23,6 @@ public interface TransportServiceAdapter extends TransportConnectionListener { - void addBytesReceived(long size); - - void addBytesSent(long size); - /** called by the {@link Transport} implementation once a request has been sent */ void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options); diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 142216bf2dd39..dbe858982090c 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.net.UnknownHostException; @@ -193,11 +194,6 @@ public void disconnectFromNode(DiscoveryNode node) { } - @Override - public long serverOpen() { - return 0; - } - @Override public Lifecycle.State lifecycleState() { return null; @@ -231,4 +227,9 @@ public Map profileBoundAddresses() { public long newRequestId() { return requestId.incrementAndGet(); } + + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index a1b80803e0c7b..2e7a857cc7bc9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import org.junit.After; import org.junit.Before; @@ -241,11 +242,6 @@ public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) return getConnection(node); } - @Override - public long serverOpen() { - return 0; - } - @Override public List getLocalAddresses() { return null; @@ -263,12 +259,10 @@ public Lifecycle.State lifecycleState() { @Override public void addLifecycleListener(LifecycleListener listener) { - } @Override public void removeLifecycleListener(LifecycleListener listener) { - } @Override @@ -279,5 +273,10 @@ public void stop() {} @Override public void close() {} + + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } } } diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index a68416cc25a6c..6ce6c2a96d604 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -235,7 +235,7 @@ protected boolean isOpen(Object o) { } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { return 0; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index e83cfc62fda08..9763a5116b163 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -22,11 +22,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; -import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.Transports; import java.net.InetSocketAddress; @@ -37,25 +34,14 @@ */ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { - private final TransportServiceAdapter transportServiceAdapter; private final Netty4Transport transport; private final String profileName; Netty4MessageChannelHandler(Netty4Transport transport, String profileName) { - this.transportServiceAdapter = transport.transportServiceAdapter(); this.transport = transport; this.profileName = profileName; } - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ByteBuf && transportServiceAdapter != null) { - // record the number of bytes send on the channel - promise.addListener(f -> transportServiceAdapter.addBytesSent(((ByteBuf) msg).readableBytes())); - } - ctx.write(msg, promise); - } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Transports.assertTransportThread(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index abe0739c243f1..140041b53b7c1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -306,7 +306,7 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { Netty4OpenChannelsHandler channels = serverOpenChannels; return channels == null ? 0 : channels.numberOfOpenChannels(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 55519ec2af22a..2ccddf6bc5437 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.io.UncheckedIOException; @@ -213,6 +214,11 @@ public void close() throws IOException { }; } + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } + @Override public void transportServiceAdapter(TransportServiceAdapter adapter) { this.adapter = adapter; @@ -250,11 +256,6 @@ public void disconnectFromNode(DiscoveryNode node) { } - @Override - public long serverOpen() { - return 0; - } - @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 210190940d229..467b2c7f3c8fd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.net.UnknownHostException; @@ -572,11 +573,6 @@ public void disconnectFromNode(DiscoveryNode node) { transport.disconnectFromNode(node); } - @Override - public long serverOpen() { - return transport.serverOpen(); - } - @Override public List getLocalAddresses() { return transport.getLocalAddresses(); @@ -609,6 +605,11 @@ public void sendRequest(long requestId, String action, TransportRequest request, }; } + @Override + public TransportStats getStats() { + return transport.getStats(); + } + @Override public Lifecycle.State lifecycleState() { return transport.lifecycleState(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 99704235cc734..e4f2fbae91798 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2252,4 +2252,194 @@ public String executor() { assertPendingConnections(0, serviceC.getOriginalTransport()); } + public void testTransportStats() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(46, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } + + public void testTransportStatsWithException() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + Exception ex = new RuntimeException("boom"); + ex.setStackTrace(new StackTraceElement[0]); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + onFailure(ex); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // exception response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + int addressLen = serviceB.boundAddress().publishAddress().address().getAddress().getAddress().length; + // if we are bound to a IPv6 address the response address is serialized with the exception so it will be different depending + // on the stack. The emphemeral port will always be in the same range + assertEquals(185 + addressLen, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 38a1701a7e1ce..94f5351cae789 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -248,7 +248,7 @@ protected void closeChannels(List channel) throws IOException { } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { return 1; } @@ -306,7 +306,9 @@ public void accept(Executor executor) throws IOException { configureSocket(incomingSocket); synchronized (this) { if (isOpen.get()) { - incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove); + incomingChannel = new MockChannel(incomingSocket, + new InetSocketAddress(incomingSocket.getLocalAddress(), incomingSocket.getPort()), profile, + workerChannels::remove); //establish a happens-before edge between closing and accepting a new connection workerChannels.add(incomingChannel);