Skip to content

Commit

Permalink
Move TransportStats accounting into TcpTransport (#25251)
Browse files Browse the repository at this point in the history
Today TcpTransport is the de-facto base-class for transport implementations.
The need for all the callbacks we have in TransportServiceAdaptor are not necessary
anymore since we can simply have the logic inside the base class itself. This change
moves the stats metrics directly into TcpTransport removing the need for low level
bytes send / received callbacks.
  • Loading branch information
s1monw authored Jun 16, 2017
1 parent ecc87f6 commit f18b0d2
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 85 deletions.
76 changes: 57 additions & 19 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
Expand Down Expand Up @@ -181,6 +182,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private final CounterMetric numHandshakes = new CounterMetric();
private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";

private final MeanMetric readBytesMetric = new MeanMetric();
private final MeanMetric transmittedBytesMetric = new MeanMetric();

public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
Expand Down Expand Up @@ -300,14 +304,14 @@ protected void doRunInLifecycle() throws Exception {
DiscoveryNode node = entry.getKey();
NodeChannels channels = entry.getValue();
for (Channel channel : channels.getChannels()) {
internalSendMessage(channel, pingHeader, new NotifyOnceListener<Channel>() {
internalSendMessage(channel, pingHeader, new SendMetricListener<Channel>(pingHeader.length()) {
@Override
public void innerOnResponse(Channel channel) {
protected void innerInnerOnResponse(Channel channel) {
successfulPings.inc();
}

@Override
public void innerOnFailure(Exception e) {
protected void innerOnFailure(Exception e) {
if (isOpen(channel)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e);
Expand Down Expand Up @@ -984,9 +988,10 @@ protected void onException(Channel channel, Exception e) {
} else if (e instanceof TcpTransport.HttpOnTransportException) {
// in case we are able to return data, serialize the exception content and sent it back to the client
if (isOpen(channel)) {
final NotifyOnceListener<Channel> closeChannel = new NotifyOnceListener<Channel>() {
BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8));
final SendMetricListener<Channel> closeChannel = new SendMetricListener<Channel>(message.length()) {
@Override
public void innerOnResponse(Channel channel) {
protected void innerInnerOnResponse(Channel channel) {
try {
closeChannels(Collections.singletonList(channel));
} catch (IOException e1) {
Expand All @@ -995,7 +1000,7 @@ public void innerOnResponse(Channel channel) {
}

@Override
public void innerOnFailure(Exception e) {
protected void innerOnFailure(Exception e) {
try {
closeChannels(Collections.singletonList(channel));
} catch (IOException e1) {
Expand All @@ -1004,7 +1009,7 @@ public void innerOnFailure(Exception e) {
}
}
};
internalSendMessage(channel, new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)), closeChannel);
internalSendMessage(channel, message, closeChannel);
}
} else {
logger.warn(
Expand Down Expand Up @@ -1086,7 +1091,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target
final TransportRequestOptions finalOptions = options;
// this might be called in a different thread
SendListener onRequestSent = new SendListener(stream,
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions));
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(targetChannel, message, onRequestSent);
addedReleaseListener = true;
} finally {
Expand All @@ -1099,7 +1104,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target
/**
* sends a message to the given channel, using the given callbacks.
*/
private void internalSendMessage(Channel targetChannel, BytesReference message, NotifyOnceListener<Channel> listener) {
private void internalSendMessage(Channel targetChannel, BytesReference message, SendMetricListener<Channel> listener) {
try {
sendMessage(targetChannel, message, listener);
} catch (Exception ex) {
Expand Down Expand Up @@ -1131,9 +1136,10 @@ public void sendErrorResponse(Version nodeVersion, Channel channel, final Except
status = TransportStatus.setError(status);
final BytesReference bytes = stream.bytes();
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
SendListener onResponseSent = new SendListener(null,
() -> transportServiceAdapter.onResponseSent(requestId, action, error));
internalSendMessage(channel, new CompositeBytesReference(header, bytes), onResponseSent);
() -> transportServiceAdapter.onResponseSent(requestId, action, error), message.length());
internalSendMessage(channel, message, onResponseSent);
}
}

Expand Down Expand Up @@ -1162,13 +1168,13 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR
}
threadPool.getThreadContext().writeTo(stream);
stream.setVersion(nodeVersion);
BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream);
BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);

final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
SendListener listener = new SendListener(stream,
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions));
internalSendMessage(channel, reference, listener);
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
if (!addedReleaseListener) {
Expand Down Expand Up @@ -1324,7 +1330,7 @@ public HttpOnTransportException(StreamInput in) throws IOException {
public final void messageReceived(BytesReference reference, Channel channel, String profileName,
InetSocketAddress remoteAddress, int messageLengthBytes) throws IOException {
final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
transportServiceAdapter.addBytesReceived(totalMessageSize);
readBytesMetric.inc(totalMessageSize);
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0;
StreamInput streamIn = reference.streamInput();
Expand Down Expand Up @@ -1662,22 +1668,42 @@ protected final void ensureOpen() {
}
}

private final class SendListener extends NotifyOnceListener<Channel> {
/**
* This listener increments the transmitted bytes metric on success.
*/
private abstract class SendMetricListener<T> extends NotifyOnceListener<T> {
private final long messageSize;

private SendMetricListener(long messageSize) {
this.messageSize = messageSize;
}

@Override
protected final void innerOnResponse(T object) {
transmittedBytesMetric.inc(messageSize);
innerInnerOnResponse(object);
}

protected abstract void innerInnerOnResponse(T object);
}

private final class SendListener extends SendMetricListener<Channel> {
private final Releasable optionalReleasable;
private final Runnable transportAdaptorCallback;

private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback) {
private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) {
super(messageLength);
this.optionalReleasable = optionalReleasable;
this.transportAdaptorCallback = transportAdaptorCallback;
}

@Override
public void innerOnResponse(Channel channel) {
protected void innerInnerOnResponse(Channel channel) {
release();
}

@Override
public void innerOnFailure(Exception e) {
protected void innerOnFailure(Exception e) {
release();
}

Expand All @@ -1701,4 +1727,16 @@ final int getNumOpenConnections() {
final int getNumConnectedNodes() {
return connectedNodes.size();
}

/**
* Returns count of currently open connections
*/
protected abstract long getNumOpenServerConnections();

@Override
public final TransportStats getStats() {
return new TransportStats(
getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
transmittedBytesMetric.sum());
}
}
7 changes: 2 additions & 5 deletions core/src/main/java/org/elasticsearch/transport/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
*/
void disconnectFromNode(DiscoveryNode node);

/**
* Returns count of currently open connections
*/
long serverOpen();

List<String> getLocalAddresses();

default CircuitBreaker getInFlightRequestBreaker() {
Expand Down Expand Up @@ -110,6 +105,8 @@ default CircuitBreaker getInFlightRequestBreaker() {
*/
Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException;

TransportStats getStats();

/**
* A unidirectional connection to a {@link DiscoveryNode}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ void setTracerLogExclude(List<String> tracerLogExclude) {

@Override
protected void doStart() {
adapter.rxMetric.clear();
adapter.txMetric.clear();
transport.transportServiceAdapter(adapter);
transport.start();

Expand Down Expand Up @@ -292,8 +290,7 @@ public TransportInfo info() {
}

public TransportStats stats() {
return new TransportStats(
transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum());
return transport.getStats();
}

public BoundTransportAddress boundAddress() {
Expand Down Expand Up @@ -738,19 +735,6 @@ protected RequestHandlerRegistry getRequestHandler(String action) {

protected class Adapter implements TransportServiceAdapter {

final MeanMetric rxMetric = new MeanMetric();
final MeanMetric txMetric = new MeanMetric();

@Override
public void addBytesReceived(long size) {
rxMetric.inc(size);
}

@Override
public void addBytesSent(long size) {
txMetric.inc(size);
}

@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@

public interface TransportServiceAdapter extends TransportConnectionListener {

void addBytesReceived(long size);

void addBytesSent(long size);

/** called by the {@link Transport} implementation once a request has been sent */
void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportStats;

import java.io.IOException;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -193,11 +194,6 @@ public void disconnectFromNode(DiscoveryNode node) {

}

@Override
public long serverOpen() {
return 0;
}

@Override
public Lifecycle.State lifecycleState() {
return null;
Expand Down Expand Up @@ -231,4 +227,9 @@ public Map<String, BoundTransportAddress> profileBoundAddresses() {
public long newRequestId() {
return requestId.incrementAndGet();
}

@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportStats;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -241,11 +242,6 @@ public Connection openConnection(DiscoveryNode node, ConnectionProfile profile)
return getConnection(node);
}

@Override
public long serverOpen() {
return 0;
}

@Override
public List<String> getLocalAddresses() {
return null;
Expand All @@ -263,12 +259,10 @@ public Lifecycle.State lifecycleState() {

@Override
public void addLifecycleListener(LifecycleListener listener) {

}

@Override
public void removeLifecycleListener(LifecycleListener listener) {

}

@Override
Expand All @@ -279,5 +273,10 @@ public void stop() {}

@Override
public void close() {}

@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ protected boolean isOpen(Object o) {
}

@Override
public long serverOpen() {
public long getNumOpenServerConnections() {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;

import java.net.InetSocketAddress;
Expand All @@ -37,25 +34,14 @@
*/
final class Netty4MessageChannelHandler extends ChannelDuplexHandler {

private final TransportServiceAdapter transportServiceAdapter;
private final Netty4Transport transport;
private final String profileName;

Netty4MessageChannelHandler(Netty4Transport transport, String profileName) {
this.transportServiceAdapter = transport.transportServiceAdapter();
this.transport = transport;
this.profileName = profileName;
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && transportServiceAdapter != null) {
// record the number of bytes send on the channel
promise.addListener(f -> transportServiceAdapter.addBytesSent(((ByteBuf) msg).readableBytes()));
}
ctx.write(msg, promise);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Transports.assertTransportThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
}

@Override
public long serverOpen() {
public long getNumOpenServerConnections() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return channels == null ? 0 : channels.numberOfOpenChannels();
}
Expand Down
Loading

0 comments on commit f18b0d2

Please sign in to comment.