From f18b0d293c325a28e578df6a965a8881dcf7850d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 16 Jun 2017 22:34:11 +0200 Subject: [PATCH] Move TransportStats accounting into TcpTransport (#25251) Today TcpTransport is the de-facto base-class for transport implementations. The need for all the callbacks we have in TransportServiceAdaptor are not necessary anymore since we can simply have the logic inside the base class itself. This change moves the stats metrics directly into TcpTransport removing the need for low level bytes send / received callbacks. --- .../elasticsearch/transport/TcpTransport.java | 76 +++++-- .../elasticsearch/transport/Transport.java | 7 +- .../transport/TransportService.java | 18 +- .../transport/TransportServiceAdapter.java | 4 - .../transport/FailAndRetryMockTransport.java | 11 +- .../cluster/NodeConnectionsServiceTests.java | 13 +- .../transport/TCPTransportTests.java | 2 +- .../netty4/Netty4MessageChannelHandler.java | 14 -- .../transport/netty4/Netty4Transport.java | 2 +- .../test/transport/CapturingTransport.java | 11 +- .../test/transport/MockTransportService.java | 11 +- .../AbstractSimpleTransportTestCase.java | 190 ++++++++++++++++++ .../transport/MockTcpTransport.java | 6 +- 13 files changed, 280 insertions(+), 85 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 22aced389f8a9..31d871a2ae8dc 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -181,6 +182,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i private final CounterMetric numHandshakes = new CounterMetric(); private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake"; + private final MeanMetric readBytesMetric = new MeanMetric(); + private final MeanMetric transmittedBytesMetric = new MeanMetric(); + public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -300,14 +304,14 @@ protected void doRunInLifecycle() throws Exception { DiscoveryNode node = entry.getKey(); NodeChannels channels = entry.getValue(); for (Channel channel : channels.getChannels()) { - internalSendMessage(channel, pingHeader, new NotifyOnceListener() { + internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) { @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { successfulPings.inc(); } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { if (isOpen(channel)) { logger.debug( (Supplier) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e); @@ -984,9 +988,10 @@ protected void onException(Channel channel, Exception e) { } else if (e instanceof TcpTransport.HttpOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (isOpen(channel)) { - final NotifyOnceListener closeChannel = new NotifyOnceListener() { + BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); + final SendMetricListener closeChannel = new SendMetricListener(message.length()) { @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { try { closeChannels(Collections.singletonList(channel)); } catch (IOException e1) { @@ -995,7 +1000,7 @@ public void innerOnResponse(Channel channel) { } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { try { closeChannels(Collections.singletonList(channel)); } catch (IOException e1) { @@ -1004,7 +1009,7 @@ public void innerOnFailure(Exception e) { } } }; - internalSendMessage(channel, new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)), closeChannel); + internalSendMessage(channel, message, closeChannel); } } else { logger.warn( @@ -1086,7 +1091,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target final TransportRequestOptions finalOptions = options; // this might be called in a different thread SendListener onRequestSent = new SendListener(stream, - () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); + () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions), message.length()); internalSendMessage(targetChannel, message, onRequestSent); addedReleaseListener = true; } finally { @@ -1099,7 +1104,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target /** * sends a message to the given channel, using the given callbacks. */ - private void internalSendMessage(Channel targetChannel, BytesReference message, NotifyOnceListener listener) { + private void internalSendMessage(Channel targetChannel, BytesReference message, SendMetricListener listener) { try { sendMessage(targetChannel, message, listener); } catch (Exception ex) { @@ -1131,9 +1136,10 @@ public void sendErrorResponse(Version nodeVersion, Channel channel, final Except status = TransportStatus.setError(status); final BytesReference bytes = stream.bytes(); final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); + CompositeBytesReference message = new CompositeBytesReference(header, bytes); SendListener onResponseSent = new SendListener(null, - () -> transportServiceAdapter.onResponseSent(requestId, action, error)); - internalSendMessage(channel, new CompositeBytesReference(header, bytes), onResponseSent); + () -> transportServiceAdapter.onResponseSent(requestId, action, error), message.length()); + internalSendMessage(channel, message, onResponseSent); } } @@ -1162,13 +1168,13 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR } threadPool.getThreadContext().writeTo(stream); stream.setVersion(nodeVersion); - BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream); + BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream); final TransportResponseOptions finalOptions = options; // this might be called in a different thread SendListener listener = new SendListener(stream, - () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); - internalSendMessage(channel, reference, listener); + () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions), message.length()); + internalSendMessage(channel, message, listener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { @@ -1324,7 +1330,7 @@ public HttpOnTransportException(StreamInput in) throws IOException { public final void messageReceived(BytesReference reference, Channel channel, String profileName, InetSocketAddress remoteAddress, int messageLengthBytes) throws IOException { final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; - transportServiceAdapter.addBytesReceived(totalMessageSize); + readBytesMetric.inc(totalMessageSize); // we have additional bytes to read, outside of the header boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0; StreamInput streamIn = reference.streamInput(); @@ -1662,22 +1668,42 @@ protected final void ensureOpen() { } } - private final class SendListener extends NotifyOnceListener { + /** + * This listener increments the transmitted bytes metric on success. + */ + private abstract class SendMetricListener extends NotifyOnceListener { + private final long messageSize; + + private SendMetricListener(long messageSize) { + this.messageSize = messageSize; + } + + @Override + protected final void innerOnResponse(T object) { + transmittedBytesMetric.inc(messageSize); + innerInnerOnResponse(object); + } + + protected abstract void innerInnerOnResponse(T object); + } + + private final class SendListener extends SendMetricListener { private final Releasable optionalReleasable; private final Runnable transportAdaptorCallback; - private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback) { + private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) { + super(messageLength); this.optionalReleasable = optionalReleasable; this.transportAdaptorCallback = transportAdaptorCallback; } @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { release(); } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { release(); } @@ -1701,4 +1727,16 @@ final int getNumOpenConnections() { final int getNumConnectedNodes() { return connectedNodes.size(); } + + /** + * Returns count of currently open connections + */ + protected abstract long getNumOpenServerConnections(); + + @Override + public final TransportStats getStats() { + return new TransportStats( + getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), + transmittedBytesMetric.sum()); + } } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index a32289332ead5..5d22e156d9d13 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -75,11 +75,6 @@ void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, */ void disconnectFromNode(DiscoveryNode node); - /** - * Returns count of currently open connections - */ - long serverOpen(); - List getLocalAddresses(); default CircuitBreaker getInFlightRequestBreaker() { @@ -110,6 +105,8 @@ default CircuitBreaker getInFlightRequestBreaker() { */ Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException; + TransportStats getStats(); + /** * A unidirectional connection to a {@link DiscoveryNode} */ diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index e5382e4e2617b..0a4745cda7995 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -203,8 +203,6 @@ void setTracerLogExclude(List tracerLogExclude) { @Override protected void doStart() { - adapter.rxMetric.clear(); - adapter.txMetric.clear(); transport.transportServiceAdapter(adapter); transport.start(); @@ -292,8 +290,7 @@ public TransportInfo info() { } public TransportStats stats() { - return new TransportStats( - transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum()); + return transport.getStats(); } public BoundTransportAddress boundAddress() { @@ -738,19 +735,6 @@ protected RequestHandlerRegistry getRequestHandler(String action) { protected class Adapter implements TransportServiceAdapter { - final MeanMetric rxMetric = new MeanMetric(); - final MeanMetric txMetric = new MeanMetric(); - - @Override - public void addBytesReceived(long size) { - rxMetric.inc(size); - } - - @Override - public void addBytesSent(long size) { - txMetric.inc(size); - } - @Override public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java index 70748b01a6802..24a71a99998a4 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java @@ -23,10 +23,6 @@ public interface TransportServiceAdapter extends TransportConnectionListener { - void addBytesReceived(long size); - - void addBytesSent(long size); - /** called by the {@link Transport} implementation once a request has been sent */ void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options); diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 142216bf2dd39..dbe858982090c 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.net.UnknownHostException; @@ -193,11 +194,6 @@ public void disconnectFromNode(DiscoveryNode node) { } - @Override - public long serverOpen() { - return 0; - } - @Override public Lifecycle.State lifecycleState() { return null; @@ -231,4 +227,9 @@ public Map profileBoundAddresses() { public long newRequestId() { return requestId.incrementAndGet(); } + + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index a1b80803e0c7b..2e7a857cc7bc9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import org.junit.After; import org.junit.Before; @@ -241,11 +242,6 @@ public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) return getConnection(node); } - @Override - public long serverOpen() { - return 0; - } - @Override public List getLocalAddresses() { return null; @@ -263,12 +259,10 @@ public Lifecycle.State lifecycleState() { @Override public void addLifecycleListener(LifecycleListener listener) { - } @Override public void removeLifecycleListener(LifecycleListener listener) { - } @Override @@ -279,5 +273,10 @@ public void stop() {} @Override public void close() {} + + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } } } diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index a68416cc25a6c..6ce6c2a96d604 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -235,7 +235,7 @@ protected boolean isOpen(Object o) { } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { return 0; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index e83cfc62fda08..9763a5116b163 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -22,11 +22,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; -import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.Transports; import java.net.InetSocketAddress; @@ -37,25 +34,14 @@ */ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { - private final TransportServiceAdapter transportServiceAdapter; private final Netty4Transport transport; private final String profileName; Netty4MessageChannelHandler(Netty4Transport transport, String profileName) { - this.transportServiceAdapter = transport.transportServiceAdapter(); this.transport = transport; this.profileName = profileName; } - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ByteBuf && transportServiceAdapter != null) { - // record the number of bytes send on the channel - promise.addListener(f -> transportServiceAdapter.addBytesSent(((ByteBuf) msg).readableBytes())); - } - ctx.write(msg, promise); - } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Transports.assertTransportThread(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index abe0739c243f1..140041b53b7c1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -306,7 +306,7 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { Netty4OpenChannelsHandler channels = serverOpenChannels; return channels == null ? 0 : channels.numberOfOpenChannels(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 55519ec2af22a..2ccddf6bc5437 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.io.UncheckedIOException; @@ -213,6 +214,11 @@ public void close() throws IOException { }; } + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } + @Override public void transportServiceAdapter(TransportServiceAdapter adapter) { this.adapter = adapter; @@ -250,11 +256,6 @@ public void disconnectFromNode(DiscoveryNode node) { } - @Override - public long serverOpen() { - return 0; - } - @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 210190940d229..467b2c7f3c8fd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.net.UnknownHostException; @@ -572,11 +573,6 @@ public void disconnectFromNode(DiscoveryNode node) { transport.disconnectFromNode(node); } - @Override - public long serverOpen() { - return transport.serverOpen(); - } - @Override public List getLocalAddresses() { return transport.getLocalAddresses(); @@ -609,6 +605,11 @@ public void sendRequest(long requestId, String action, TransportRequest request, }; } + @Override + public TransportStats getStats() { + return transport.getStats(); + } + @Override public Lifecycle.State lifecycleState() { return transport.lifecycleState(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 99704235cc734..e4f2fbae91798 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2252,4 +2252,194 @@ public String executor() { assertPendingConnections(0, serviceC.getOriginalTransport()); } + public void testTransportStats() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(46, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } + + public void testTransportStatsWithException() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + Exception ex = new RuntimeException("boom"); + ex.setStackTrace(new StackTraceElement[0]); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + onFailure(ex); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // exception response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + int addressLen = serviceB.boundAddress().publishAddress().address().getAddress().getAddress().length; + // if we are bound to a IPv6 address the response address is serialized with the exception so it will be different depending + // on the stack. The emphemeral port will always be in the same range + assertEquals(185 + addressLen, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 38a1701a7e1ce..94f5351cae789 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -248,7 +248,7 @@ protected void closeChannels(List channel) throws IOException { } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { return 1; } @@ -306,7 +306,9 @@ public void accept(Executor executor) throws IOException { configureSocket(incomingSocket); synchronized (this) { if (isOpen.get()) { - incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove); + incomingChannel = new MockChannel(incomingSocket, + new InetSocketAddress(incomingSocket.getLocalAddress(), incomingSocket.getPort()), profile, + workerChannels::remove); //establish a happens-before edge between closing and accepting a new connection workerChannels.add(incomingChannel);