From 93c510b3d91030a2e0a74a46062d373d757f5fb2 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Wed, 20 Sep 2023 22:05:37 +0530 Subject: [PATCH 1/9] Add instrumentation in InboundHandler Signed-off-by: Gagan Juneja --- .../transport/Netty4ModulePlugin.java | 6 +- .../transport/netty4/Netty4Transport.java | 6 +- .../Netty4SizeHeaderFrameDecoderTests.java | 4 +- .../netty4/NettyTransportMultiPortTests.java | 4 +- .../netty4/SimpleNetty4TransportTests.java | 4 +- .../discovery/ec2/Ec2DiscoveryTests.java | 3 +- .../discovery/ec2/Ec2RetriesTests.java | 3 +- .../transport/nio/NioTransport.java | 6 +- .../transport/nio/NioTransportPlugin.java | 6 +- .../nio/SimpleNioTransportTests.java | 4 +- .../common/network/NetworkModule.java | 3 +- .../org/opensearch/plugins/NetworkPlugin.java | 3 +- .../telemetry/tracing/AttributeNames.java | 5 + .../telemetry/tracing/SpanBuilder.java | 21 +++ .../tracing/channels/TraceableTcpChannel.java | 120 ++++++++++++++ .../opensearch/transport/InboundHandler.java | 156 ++++++++++-------- .../opensearch/transport/TcpTransport.java | 7 +- .../node/tasks/TaskManagerTestCase.java | 3 +- ...TransportResyncReplicationActionTests.java | 3 +- .../BroadcastReplicationTests.java | 3 +- .../TransportReplicationActionTests.java | 3 +- .../common/network/NetworkModuleTests.java | 9 +- .../FileBasedSeedHostsProviderTests.java | 3 +- .../discovery/SeedHostsResolverTests.java | 15 +- .../extensions/ExtensionsManagerTests.java | 3 +- ...ExtensionTransportActionsHandlerTests.java | 3 +- .../RestInitializeExtensionActionTests.java | 3 +- .../rest/RestSendToExtensionActionTests.java | 3 +- .../transport/InboundHandlerTests.java | 4 +- .../transport/TcpTransportTests.java | 4 +- .../TransportServiceHandshakeTests.java | 3 +- .../test/transport/MockTransportService.java | 4 +- .../transport/nio/MockNioTransport.java | 6 +- .../transport/nio/MockNioTransportPlugin.java | 7 +- .../nio/SimpleMockNioTransportTests.java | 4 +- 35 files changed, 328 insertions(+), 116 deletions(-) create mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java index ca51d70702a82..1a34dfd2c9ee4 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java @@ -96,7 +96,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap( NETTY_TRANSPORT_NAME, @@ -108,7 +109,8 @@ public Map> getTransports( pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, - getSharedGroupFactory(settings) + getSharedGroupFactory(settings), + tracer ) ); } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java index f77c29c8bfa60..e76a227630dc1 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java @@ -50,6 +50,7 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Netty4NioSocketChannel; import org.opensearch.transport.NettyAllocator; @@ -131,9 +132,10 @@ public Netty4Transport( PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, - SharedGroupFactory sharedGroupFactory + SharedGroupFactory sharedGroupFactory, + Tracer tracer ) { - super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer); Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings)); NettyAllocator.logAllocatorDescriptionIfNeeded(); this.sharedGroupFactory = sharedGroupFactory; diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 3e5f71f1464a1..c92ccba82835f 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -40,6 +40,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.SharedGroupFactory; @@ -86,7 +87,8 @@ public void startThreadPool() { recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), - new SharedGroupFactory(settings) + new SharedGroupFactory(settings), + NoopTracer.INSTANCE ); nettyTransport.start(); diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java index 98a001b8ae4bb..7cca00db68559 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java @@ -40,6 +40,7 @@ import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -141,7 +142,8 @@ private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), - new SharedGroupFactory(settings) + new SharedGroupFactory(settings), + NoopTracer.INSTANCE ); transport.start(); diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java index 35b19002dce8d..710b3ff6bd0ca 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.transport.MockTransportService; import org.opensearch.test.transport.StubbableTransport; import org.opensearch.transport.AbstractSimpleTransportTestCase; @@ -82,7 +83,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(), - new SharedGroupFactory(settings) + new SharedGroupFactory(settings), + NoopTracer.INSTANCE ) { @Override diff --git a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java index 6bed6564cfd36..02e1ff40f7ed6 100644 --- a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java @@ -92,7 +92,8 @@ protected MockTransportService createTransportService() { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, writableRegistry(), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override public TransportAddress[] addressesFromString(String address) { diff --git a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java index 3311ddc8842f2..ce097667f9c4b 100644 --- a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java +++ b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java @@ -77,7 +77,8 @@ protected MockTransportService createTransportService() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, diff --git a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java index dfa72d6d59a0d..55920bab4efd3 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java @@ -52,6 +52,7 @@ import org.opensearch.nio.NioSelector; import org.opensearch.nio.NioSocketChannel; import org.opensearch.nio.ServerChannelContext; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpTransport; import org.opensearch.transport.TransportSettings; @@ -84,9 +85,10 @@ protected NioTransport( PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, - NioGroupFactory groupFactory + NioGroupFactory groupFactory, + Tracer tracer ) { - super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer); this.pageAllocator = new PageAllocator(pageCacheRecycler); this.groupFactory = groupFactory; } diff --git a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java index ec266d76eff3d..d4be876867651 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java @@ -91,7 +91,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap( NIO_TRANSPORT_NAME, @@ -103,7 +104,8 @@ public Map> getTransports( pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, - getNioGroupFactory(settings) + getNioGroupFactory(settings), + tracer ) ); } diff --git a/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java index 24cc38c17a9d1..f5d1c618f5ace 100644 --- a/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.transport.MockTransportService; import org.opensearch.test.transport.StubbableTransport; import org.opensearch.transport.AbstractSimpleTransportTestCase; @@ -81,7 +82,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService(), - new NioGroupFactory(settings, logger) + new NioGroupFactory(settings, logger), + NoopTracer.INSTANCE ) { @Override diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 8870e26c373e9..0734659d8ee72 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -174,7 +174,8 @@ public NetworkModule( pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, - networkService + networkService, + tracer ); for (Map.Entry> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java index f2f8e84f04e02..07df40bafe6a1 100644 --- a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java @@ -83,7 +83,8 @@ default Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index e86b21ae0fd3b..a9514c298ef88 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -60,6 +60,11 @@ private AttributeNames() { */ public static final String TRANSPORT_TARGET_HOST = "target_host"; + /** + * Transport Service send request local host. + */ + public static final String TRANSPORT_HOST = "host"; + /** * Action Name. */ diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index a93ce11f374fe..53c59487049f2 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -13,6 +13,7 @@ import org.opensearch.http.HttpRequest; import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.attributes.Attributes; +import org.opensearch.transport.TcpChannel; import org.opensearch.transport.Transport; import java.util.Arrays; @@ -127,4 +128,24 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio return attributes; } + /** + * Creates {@link SpanCreationContext} from Inbound Handler. + * @param action action. + * @param tcpChannel tcp channel. + * @return context + */ + public static SpanCreationContext from(String action, TcpChannel tcpChannel) { + return new SpanCreationContext(createSpanName(action, tcpChannel), buildSpanAttributes(action, tcpChannel)); + } + + private static String createSpanName(String action, TcpChannel tcpChannel) { + return action + SEPARATOR + tcpChannel.getLocalAddress().getHostString(); + } + + private static Attributes buildSpanAttributes(String action, TcpChannel tcpChannel) { + Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, action); + attributes.addAttribute(AttributeNames.TRANSPORT_HOST, tcpChannel.getLocalAddress().getHostString()); + return attributes; + } + } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java new file mode 100644 index 0000000000000..cb3ca1ba30d73 --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.channels; + +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; +import org.opensearch.transport.TcpChannel; + +import java.net.InetSocketAddress; + +/** + * Tracer wrapped {@link TcpChannel} + */ +public class TraceableTcpChannel implements TcpChannel { + + private final TcpChannel delegate; + private final Span span; + private final Tracer tracer; + + private final static ActionListener DUMMY_ACTION_LISTENER = ActionListener.wrap(() -> {}); + + /** + * Constructor. + * @param delegate delegate + * @param span span + * @param tracer tracer + */ + public TraceableTcpChannel(TcpChannel delegate, Span span, Tracer tracer) { + this.delegate = delegate; + this.span = span; + this.tracer = tracer; + } + + /** + * Factory method. + * + * @param delegate delegate + * @param span span + * @param tracer tracer + * @return tcp channel + */ + public static TcpChannel create(TcpChannel delegate, final Span span, final Tracer tracer) { + if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { + TraceableTcpChannel traceableTcpChannel = new TraceableTcpChannel(delegate, span, tracer); + traceableTcpChannel.addCloseListener(TraceableActionListener.create(DUMMY_ACTION_LISTENER, span, tracer)); + return traceableTcpChannel; + } else { + return delegate; + } + } + + @Override + public void close() { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.close(); + } finally { + span.endSpan(); + } + } + + @Override + public void addCloseListener(ActionListener listener) { + delegate.addCloseListener(listener); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public boolean isServerChannel() { + return delegate.isServerChannel(); + } + + @Override + public String getProfile() { + return delegate.getProfile(); + } + + @Override + public InetSocketAddress getLocalAddress() { + return delegate.getLocalAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return delegate.getRemoteAddress(); + } + + @Override + public void sendMessage(BytesReference reference, ActionListener listener) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.sendMessage(reference, listener); + } finally { + span.endSpan(); + } + } + + @Override + public void addConnectListener(ActionListener listener) { + delegate.addConnectListener(listener); + } + + @Override + public ChannelStats getChannelStats() { + return delegate.getChannelStats(); + } +} diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index 9f9232c18079a..020e2cdba4b38 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -46,6 +46,11 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.channels.TraceableTcpChannel; import org.opensearch.threadpool.ThreadPool; import java.io.EOFException; @@ -74,6 +79,8 @@ public class InboundHandler { private volatile long slowLogThresholdMs = Long.MAX_VALUE; + private final Tracer tracer; + InboundHandler( ThreadPool threadPool, OutboundHandler outboundHandler, @@ -81,7 +88,8 @@ public class InboundHandler { TransportHandshaker handshaker, TransportKeepAlive keepAlive, Transport.RequestHandlers requestHandlers, - Transport.ResponseHandlers responseHandlers + Transport.ResponseHandlers responseHandlers, + Tracer tracer ) { this.threadPool = threadPool; this.outboundHandler = outboundHandler; @@ -90,6 +98,7 @@ public class InboundHandler { this.keepAlive = keepAlive; this.requestHandlers = requestHandlers; this.responseHandlers = responseHandlers; + this.tracer = tracer; } void setMessageListener(TransportMessageListener listener) { @@ -108,7 +117,6 @@ void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception final long startTime = threadPool.relativeTimeInMillis(); channel.getChannelStats().markAccessed(startTime); TransportLogger.logInboundMessage(channel, message); - if (message.isPing()) { keepAlive.receiveKeepAlive(channel); } else { @@ -123,7 +131,6 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st final InetSocketAddress remoteAddress = channel.getRemoteAddress(); final Header header = message.getHeader(); assert header.needsToReadVariableHeader() == false; - ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext existing = threadContext.stashContext()) { // Place the context with the headers from the message @@ -165,6 +172,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st handleResponse(requestId, remoteAddress, EMPTY_STREAM_INPUT, handler); } } + } } finally { final long took = threadPool.relativeTimeInMillis() - startTime; @@ -184,79 +192,83 @@ private void handleRequest(TcpChannel channel, Head final String action = header.getActionName(); final long requestId = header.getRequestId(); final Version version = header.getVersion(); - if (header.isHandshake()) { - messageListener.onRequestReceived(requestId, action); - // Cannot short circuit handshakes - assert message.isShortCircuit() == false; - final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); - assertRemoteVersion(stream, header.getVersion()); - final TransportChannel transportChannel = new TcpTransportChannel( - outboundHandler, - channel, - action, - requestId, - version, - header.getFeatures(), - header.isCompressed(), - header.isHandshake(), - message.takeBreakerReleaseControl() - ); - try { - handshaker.handleHandshake(transportChannel, requestId, stream); - } catch (Exception e) { - if (Version.CURRENT.isCompatible(header.getVersion())) { - sendErrorResponse(action, transportChannel, e); - } else { - logger.warn( - new ParameterizedMessage( - "could not send error response to handshake received on [{}] using wire format version [{}], closing channel", - channel, - header.getVersion() - ), - e - ); - channel.close(); - } - } - } else { - final TransportChannel transportChannel = new TcpTransportChannel( - outboundHandler, - channel, - action, - requestId, - version, - header.getFeatures(), - header.isCompressed(), - header.isHandshake(), - message.takeBreakerReleaseControl() - ); - try { + Span span = tracer.startSpan(SpanBuilder.from(action, channel)); + channel = TraceableTcpChannel.create(channel, span, tracer); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + if (header.isHandshake()) { messageListener.onRequestReceived(requestId, action); - if (message.isShortCircuit()) { - sendErrorResponse(action, transportChannel, message.getException()); - } else { - final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); - assertRemoteVersion(stream, header.getVersion()); - final RequestHandlerRegistry reg = requestHandlers.getHandler(action); - assert reg != null; - - final T request = newRequest(requestId, action, stream, reg); - request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); - checkStreamIsFullyConsumed(requestId, action, stream); - - final String executor = reg.getExecutor(); - if (ThreadPool.Names.SAME.equals(executor)) { - try { - reg.processMessageReceived(request, transportChannel); - } catch (Exception e) { - sendErrorResponse(reg.getAction(), transportChannel, e); - } + // Cannot short circuit handshakes + assert message.isShortCircuit() == false; + final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); + assertRemoteVersion(stream, header.getVersion()); + final TransportChannel transportChannel = new TcpTransportChannel( + outboundHandler, + channel, + action, + requestId, + version, + header.getFeatures(), + header.isCompressed(), + header.isHandshake(), + message.takeBreakerReleaseControl() + ); + try { + handshaker.handleHandshake(transportChannel, requestId, stream); + } catch (Exception e) { + if (Version.CURRENT.isCompatible(header.getVersion())) { + sendErrorResponse(action, transportChannel, e); } else { - threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel)); + logger.warn( + new ParameterizedMessage( + "could not send error response to handshake received on [{}] using wire format version [{}], closing channel", + channel, + header.getVersion() + ), + e + ); + channel.close(); } } - } catch (Exception e) { - sendErrorResponse(action, transportChannel, e); + } else { + final TransportChannel transportChannel = new TcpTransportChannel( + outboundHandler, + channel, + action, + requestId, + version, + header.getFeatures(), + header.isCompressed(), + header.isHandshake(), + message.takeBreakerReleaseControl() + ); + try { + messageListener.onRequestReceived(requestId, action); + if (message.isShortCircuit()) { + sendErrorResponse(action, transportChannel, message.getException()); + } else { + final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); + assertRemoteVersion(stream, header.getVersion()); + final RequestHandlerRegistry reg = requestHandlers.getHandler(action); + assert reg != null; + + final T request = newRequest(requestId, action, stream, reg); + request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); + checkStreamIsFullyConsumed(requestId, action, stream); + + final String executor = reg.getExecutor(); + if (ThreadPool.Names.SAME.equals(executor)) { + try { + reg.processMessageReceived(request, transportChannel); + } catch (Exception e) { + sendErrorResponse(reg.getAction(), transportChannel, e); + } + } else { + threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel)); + } + } + } catch (Exception e) { + sendErrorResponse(action, transportChannel, e); + } } } } diff --git a/server/src/main/java/org/opensearch/transport/TcpTransport.java b/server/src/main/java/org/opensearch/transport/TcpTransport.java index 7da7dcad13120..d0e6516973382 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransport.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransport.java @@ -68,6 +68,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.node.Node; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -159,7 +160,8 @@ public TcpTransport( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { this.settings = settings; this.profileSettings = getProfileSettings(settings); @@ -208,7 +210,8 @@ public TcpTransport( handshaker, keepAlive, requestHandlers, - responseHandlers + responseHandlers, + tracer ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 07e149dd72164..a3fa0f9cb16e4 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -211,7 +211,8 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, diff --git a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java index fbac465f946f4..3bd8930064563 100644 --- a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java @@ -135,7 +135,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { new NetworkService(emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) ) { diff --git a/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java index 77c9c64ad6611..19a9918fa4561 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java @@ -116,7 +116,8 @@ public void setUp() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - circuitBreakerService + circuitBreakerService, + NoopTracer.INSTANCE ); clusterService = createClusterService(threadPool); transportService = new TransportService( diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java index 0bee99f4d5656..03150d6e30755 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java @@ -1318,7 +1318,8 @@ public void testRetryOnReplicaWithRealTransport() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( Settings.EMPTY, diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index d28a4a51999e6..0ca118fe422a5 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -118,7 +118,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap("custom", custom); } @@ -176,7 +177,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap("default_custom", customTransport); } @@ -220,7 +222,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap("default_custom", customTransport); } diff --git a/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java b/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java index 688a532a61c4a..f4515361a89b8 100644 --- a/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java +++ b/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java @@ -100,7 +100,8 @@ private void createTransportSvc() { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override public BoundTransportAddress boundAddress() { diff --git a/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java b/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java index dc0829adac101..421f6c6fe279b 100644 --- a/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java +++ b/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java @@ -185,7 +185,8 @@ public void testRemovingLocalAddresses() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -237,7 +238,8 @@ public void testUnknownHost() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -292,7 +294,8 @@ public void testResolveTimeout() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -368,7 +371,8 @@ public void testCancellationOnClose() throws InterruptedException { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -432,7 +436,8 @@ public void testInvalidHosts() throws IllegalAccessException { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override public BoundTransportAddress boundAddress() { diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index c61afdd5c5261..3c25dbdff3342 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -117,7 +117,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java index 1dede94c68208..c4d2f81f7cf79 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -68,7 +68,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java index e237214ab88f5..0dae0ae1b4e0b 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java @@ -68,7 +68,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java index fe738ff7d85e6..9da976de7d7f6 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java @@ -79,7 +79,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java index 9a261c5745bc2..e002297911788 100644 --- a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java @@ -50,6 +50,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -117,7 +118,8 @@ public void sendMessage(BytesReference reference, ActionListener listener) handshaker, keepAlive, requestHandlers, - responseHandlers + responseHandlers, + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/transport/TcpTransportTests.java b/server/src/test/java/org/opensearch/transport/TcpTransportTests.java index 06545b77c6d76..7ab78cca7d615 100644 --- a/server/src/test/java/org/opensearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/opensearch/transport/TcpTransportTests.java @@ -47,6 +47,7 @@ import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.junit.annotations.TestLogging; @@ -255,7 +256,8 @@ private void testDefaultSeedAddresses(final Settings settings, Matcher> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap( MOCK_NIO_TRANSPORT_NAME, @@ -68,7 +70,8 @@ public Map> getTransports( networkService, pageCacheRecycler, namedWriteableRegistry, - circuitBreakerService + circuitBreakerService, + tracer ) ); } diff --git a/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java index fb77161a02aef..ce401ad99fad7 100644 --- a/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java @@ -42,6 +42,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.transport.AbstractSimpleTransportTestCase; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.ConnectionProfile; @@ -71,7 +72,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti networkService, new MockPageCacheRecycler(settings), namedWriteableRegistry, - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override From 69ba6e3c0969771b0305389baaaa12006e58a095 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Wed, 20 Sep 2023 22:33:03 +0530 Subject: [PATCH 2/9] Add CHANGELOG Signed-off-by: Gagan Juneja --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a37866012ba1..2ddd641e2c85b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122)) - Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246)) - Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200)) +- Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143)) ### Deprecated From 8844fc8baebfe18f25162fe9d3a1142785cda67c Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 2 Oct 2023 21:59:24 +0530 Subject: [PATCH 3/9] Address review comment Signed-off-by: Gagan Juneja --- .../channels/TraceableTransportChannel.java | 100 ++++++++++++++++++ .../opensearch/transport/InboundHandler.java | 19 ++-- 2 files changed, 110 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java new file mode 100644 index 0000000000000..704b781f74f81 --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.channels; + +import org.opensearch.Version; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; +import org.opensearch.transport.TcpChannel; +import org.opensearch.transport.TransportChannel; + +import java.io.IOException; + +/** + * Tracer wrapped {@link TransportChannel} + */ +public class TraceableTransportChannel implements TransportChannel { + + private final TransportChannel delegate; + private final Span span; + private final Tracer tracer; + + private final TcpChannel tcpChannel; + + private final static ActionListener DUMMY_ACTION_LISTENER = ActionListener.wrap(() -> {}); + + /** + * Constructor. + * @param delegate delegate + * @param span span + * @param tracer tracer + */ + public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel tcpChannel) { + this.delegate = delegate; + this.span = span; + this.tracer = tracer; + this.tcpChannel = tcpChannel; + } + + /** + * Factory method. + * + * @param delegate delegate + * @param span span + * @param tracer tracer + * @return transport channel + */ + public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) { + if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { + tcpChannel.addCloseListener(TraceableActionListener.create(DUMMY_ACTION_LISTENER, span, tracer)); + return new TraceableTransportChannel(delegate, span, tracer, tcpChannel); + } else { + return delegate; + } + } + + @Override + public String getProfileName() { + return delegate.getProfileName(); + } + + @Override + public String getChannelType() { + return delegate.getChannelType(); + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.sendResponse(response); + } finally { + span.endSpan(); + } + } + + @Override + public void sendResponse(Exception exception) throws IOException { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.sendResponse(exception); + } finally { + span.setError(exception); + span.endSpan(); + } + } + + @Override + public Version getVersion() { + return delegate.getVersion(); + } +} diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index 020e2cdba4b38..6ea08a748a0cd 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -50,7 +50,7 @@ import org.opensearch.telemetry.tracing.SpanBuilder; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.channels.TraceableTcpChannel; +import org.opensearch.telemetry.tracing.channels.TraceableTransportChannel; import org.opensearch.threadpool.ThreadPool; import java.io.EOFException; @@ -193,7 +193,6 @@ private void handleRequest(TcpChannel channel, Head final long requestId = header.getRequestId(); final Version version = header.getVersion(); Span span = tracer.startSpan(SpanBuilder.from(action, channel)); - channel = TraceableTcpChannel.create(channel, span, tracer); try (SpanScope spanScope = tracer.withSpanInScope(span)) { if (header.isHandshake()) { messageListener.onRequestReceived(requestId, action); @@ -212,11 +211,12 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); + TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel); try { - handshaker.handleHandshake(transportChannel, requestId, stream); + handshaker.handleHandshake(traceableTransportChannel, requestId, stream); } catch (Exception e) { if (Version.CURRENT.isCompatible(header.getVersion())) { - sendErrorResponse(action, transportChannel, e); + sendErrorResponse(action, traceableTransportChannel, e); } else { logger.warn( new ParameterizedMessage( @@ -241,10 +241,11 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); + TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel); try { messageListener.onRequestReceived(requestId, action); if (message.isShortCircuit()) { - sendErrorResponse(action, transportChannel, message.getException()); + sendErrorResponse(action, traceableTransportChannel, message.getException()); } else { final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); assertRemoteVersion(stream, header.getVersion()); @@ -258,16 +259,16 @@ private void handleRequest(TcpChannel channel, Head final String executor = reg.getExecutor(); if (ThreadPool.Names.SAME.equals(executor)) { try { - reg.processMessageReceived(request, transportChannel); + reg.processMessageReceived(request, traceableTransportChannel); } catch (Exception e) { - sendErrorResponse(reg.getAction(), transportChannel, e); + sendErrorResponse(reg.getAction(), traceableTransportChannel, e); } } else { - threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel)); + threadPool.executor(executor).execute(new RequestHandler<>(reg, request, traceableTransportChannel)); } } } catch (Exception e) { - sendErrorResponse(action, transportChannel, e); + sendErrorResponse(action, traceableTransportChannel, e); } } } From ee883d96c029dd6dde2b1c9c5b1d920db56b00fe Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 3 Oct 2023 14:34:03 +0530 Subject: [PATCH 4/9] Address review comment Signed-off-by: Gagan Juneja --- .../telemetry/tracing/OTelSpan.java | 2 +- .../telemetry/tracing/SpanBuilder.java | 6 +- .../tracing/channels/TraceableTcpChannel.java | 120 ------------------ .../channels/TraceableTransportChannel.java | 18 ++- .../opensearch/transport/InboundHandler.java | 4 + .../test/telemetry/tracing/MockSpan.java | 2 +- 6 files changed, 24 insertions(+), 128 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java index 8ad03d807d9da..588f7953e329f 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java @@ -57,7 +57,7 @@ public void addAttribute(String key, Boolean value) { @Override public void setError(Exception exception) { - delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage()); + delegateSpan.setStatus(StatusCode.ERROR, exception != null ? exception.getMessage() : "no description"); } @Override diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 53c59487049f2..d97fbd371ab2a 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -135,11 +135,13 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio * @return context */ public static SpanCreationContext from(String action, TcpChannel tcpChannel) { - return new SpanCreationContext(createSpanName(action, tcpChannel), buildSpanAttributes(action, tcpChannel)); + return SpanCreationContext.server().name(createSpanName(action, tcpChannel)).attributes(buildSpanAttributes(action, tcpChannel)); } private static String createSpanName(String action, TcpChannel tcpChannel) { - return action + SEPARATOR + tcpChannel.getLocalAddress().getHostString(); + return action + SEPARATOR + (tcpChannel.getRemoteAddress() != null + ? tcpChannel.getRemoteAddress().getHostString() + : tcpChannel.getLocalAddress().getHostString()); } private static Attributes buildSpanAttributes(String action, TcpChannel tcpChannel) { diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java deleted file mode 100644 index cb3ca1ba30d73..0000000000000 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpChannel.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.telemetry.tracing.channels; - -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.telemetry.tracing.Span; -import org.opensearch.telemetry.tracing.SpanScope; -import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.listener.TraceableActionListener; -import org.opensearch.transport.TcpChannel; - -import java.net.InetSocketAddress; - -/** - * Tracer wrapped {@link TcpChannel} - */ -public class TraceableTcpChannel implements TcpChannel { - - private final TcpChannel delegate; - private final Span span; - private final Tracer tracer; - - private final static ActionListener DUMMY_ACTION_LISTENER = ActionListener.wrap(() -> {}); - - /** - * Constructor. - * @param delegate delegate - * @param span span - * @param tracer tracer - */ - public TraceableTcpChannel(TcpChannel delegate, Span span, Tracer tracer) { - this.delegate = delegate; - this.span = span; - this.tracer = tracer; - } - - /** - * Factory method. - * - * @param delegate delegate - * @param span span - * @param tracer tracer - * @return tcp channel - */ - public static TcpChannel create(TcpChannel delegate, final Span span, final Tracer tracer) { - if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { - TraceableTcpChannel traceableTcpChannel = new TraceableTcpChannel(delegate, span, tracer); - traceableTcpChannel.addCloseListener(TraceableActionListener.create(DUMMY_ACTION_LISTENER, span, tracer)); - return traceableTcpChannel; - } else { - return delegate; - } - } - - @Override - public void close() { - try (SpanScope scope = tracer.withSpanInScope(span)) { - delegate.close(); - } finally { - span.endSpan(); - } - } - - @Override - public void addCloseListener(ActionListener listener) { - delegate.addCloseListener(listener); - } - - @Override - public boolean isOpen() { - return delegate.isOpen(); - } - - @Override - public boolean isServerChannel() { - return delegate.isServerChannel(); - } - - @Override - public String getProfile() { - return delegate.getProfile(); - } - - @Override - public InetSocketAddress getLocalAddress() { - return delegate.getLocalAddress(); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return delegate.getRemoteAddress(); - } - - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - try (SpanScope scope = tracer.withSpanInScope(span)) { - delegate.sendMessage(reference, listener); - } finally { - span.endSpan(); - } - } - - @Override - public void addConnectListener(ActionListener listener) { - delegate.addConnectListener(listener); - } - - @Override - public ChannelStats getChannelStats() { - return delegate.getChannelStats(); - } -} diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java index 704b781f74f81..19d4d80a75d34 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java @@ -15,7 +15,6 @@ import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.listener.TraceableActionListener; import org.opensearch.transport.TcpChannel; import org.opensearch.transport.TransportChannel; @@ -32,8 +31,6 @@ public class TraceableTransportChannel implements TransportChannel { private final TcpChannel tcpChannel; - private final static ActionListener DUMMY_ACTION_LISTENER = ActionListener.wrap(() -> {}); - /** * Constructor. * @param delegate delegate @@ -57,7 +54,20 @@ public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tr */ public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) { if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { - tcpChannel.addCloseListener(TraceableActionListener.create(DUMMY_ACTION_LISTENER, span, tracer)); + tcpChannel.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void unused) { + onFailure(null); + } + + @Override + public void onFailure(Exception e) { + span.addEvent("The TransportChannel was closed without sending the response"); + span.setError(e); + span.endSpan(); + } + }); + return new TraceableTransportChannel(delegate, span, tracer, tcpChannel); } else { return delegate; diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index 6ea08a748a0cd..38b8dab2bd83f 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -271,6 +271,10 @@ private void handleRequest(TcpChannel channel, Head sendErrorResponse(action, traceableTransportChannel, e); } } + } catch (Exception e) { + span.setError(e); + span.endSpan(); + throw e; } } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java index 4b661dc0ad0fe..f13ef2d26284f 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java @@ -166,7 +166,7 @@ public Long getEndTime() { } public void setError(Exception exception) { - putMetadata("ERROR", exception.getMessage()); + putMetadata("ERROR", exception != null ? exception.getMessage() : null); } private static class IdGenerator { From dff81e1093b47801a72b24aba162a2a46a465d87 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 3 Oct 2023 18:49:06 +0530 Subject: [PATCH 5/9] Address review comment Signed-off-by: Gagan Juneja --- .../java/org/opensearch/telemetry/tracing/OTelSpan.java | 4 +++- .../tracing/channels/TraceableTransportChannel.java | 6 +++--- .../org/opensearch/test/telemetry/tracing/MockSpan.java | 4 +++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java index 588f7953e329f..fc917968579e1 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java @@ -57,7 +57,9 @@ public void addAttribute(String key, Boolean value) { @Override public void setError(Exception exception) { - delegateSpan.setStatus(StatusCode.ERROR, exception != null ? exception.getMessage() : "no description"); + if (exception != null) { + delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage()); + } } @Override diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java index 19d4d80a75d34..bf88b69a3a26e 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java @@ -29,8 +29,6 @@ public class TraceableTransportChannel implements TransportChannel { private final Span span; private final Tracer tracer; - private final TcpChannel tcpChannel; - /** * Constructor. * @param delegate delegate @@ -41,7 +39,6 @@ public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tr this.delegate = delegate; this.span = span; this.tracer = tracer; - this.tcpChannel = tcpChannel; } /** @@ -88,6 +85,9 @@ public String getChannelType() { public void sendResponse(TransportResponse response) throws IOException { try (SpanScope scope = tracer.withSpanInScope(span)) { delegate.sendResponse(response); + } catch (final IOException ex) { + span.setError(ex); + throw ex; } finally { span.endSpan(); } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java index f13ef2d26284f..c5d179f6412a8 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java @@ -166,7 +166,9 @@ public Long getEndTime() { } public void setError(Exception exception) { - putMetadata("ERROR", exception != null ? exception.getMessage() : null); + if (exception != null) { + putMetadata("ERROR", exception.getMessage()); + } } private static class IdGenerator { From 68ce05b7187aa81169505809bfe42f921cafa00b Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 3 Oct 2023 21:35:12 +0530 Subject: [PATCH 6/9] Empty-Commit Signed-off-by: Gagan Juneja From 6b3c1322049913771d737b53228df0f01f8a54af Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 5 Oct 2023 00:27:50 +0530 Subject: [PATCH 7/9] Address review comment Signed-off-by: Gagan Juneja --- ...java => TraceableTcpTransportChannel.java} | 10 ++++-- .../transport/BaseTcpTransportChannel.java | 32 +++++++++++++++++++ .../opensearch/transport/InboundHandler.java | 10 +++--- .../transport/RequestHandlerRegistry.java | 4 +-- .../transport/TcpTransportChannel.java | 6 ++-- 5 files changed, 48 insertions(+), 14 deletions(-) rename server/src/main/java/org/opensearch/telemetry/tracing/channels/{TraceableTransportChannel.java => TraceableTcpTransportChannel.java} (87%) create mode 100644 server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java similarity index 87% rename from server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java rename to server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index bf88b69a3a26e..e9222b0787433 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -15,6 +15,7 @@ import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.transport.BaseTcpTransportChannel; import org.opensearch.transport.TcpChannel; import org.opensearch.transport.TransportChannel; @@ -23,7 +24,7 @@ /** * Tracer wrapped {@link TransportChannel} */ -public class TraceableTransportChannel implements TransportChannel { +public class TraceableTcpTransportChannel extends BaseTcpTransportChannel { private final TransportChannel delegate; private final Span span; @@ -34,8 +35,10 @@ public class TraceableTransportChannel implements TransportChannel { * @param delegate delegate * @param span span * @param tracer tracer + * @param channel channel */ - public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel tcpChannel) { + public TraceableTcpTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel channel) { + super(channel); this.delegate = delegate; this.span = span; this.tracer = tracer; @@ -47,6 +50,7 @@ public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tr * @param delegate delegate * @param span span * @param tracer tracer + * @param tcpChannel tcpChannel * @return transport channel */ public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) { @@ -65,7 +69,7 @@ public void onFailure(Exception e) { } }); - return new TraceableTransportChannel(delegate, span, tracer, tcpChannel); + return new TraceableTcpTransportChannel(delegate, span, tracer, tcpChannel); } else { return delegate; } diff --git a/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java new file mode 100644 index 0000000000000..b7c4523c98dd2 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +/** + * Base class TcpTransportChannel + */ +public abstract class BaseTcpTransportChannel implements TransportChannel { + private final TcpChannel channel; + + /** + * Constructor. + * @param channel tcp channel + */ + public BaseTcpTransportChannel(TcpChannel channel) { + this.channel = channel; + } + + /** + * Returns {@link TcpChannel} + * @return TcpChannel + */ + public TcpChannel getChannel() { + return channel; + } +} diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index 38b8dab2bd83f..f6e90aed860b3 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -50,7 +50,7 @@ import org.opensearch.telemetry.tracing.SpanBuilder; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.channels.TraceableTransportChannel; +import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel; import org.opensearch.threadpool.ThreadPool; import java.io.EOFException; @@ -200,7 +200,7 @@ private void handleRequest(TcpChannel channel, Head assert message.isShortCircuit() == false; final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); assertRemoteVersion(stream, header.getVersion()); - final TransportChannel transportChannel = new TcpTransportChannel( + final TcpTransportChannel transportChannel = new TcpTransportChannel( outboundHandler, channel, action, @@ -211,7 +211,7 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); - TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel); try { handshaker.handleHandshake(traceableTransportChannel, requestId, stream); } catch (Exception e) { @@ -230,7 +230,7 @@ private void handleRequest(TcpChannel channel, Head } } } else { - final TransportChannel transportChannel = new TcpTransportChannel( + final TcpTransportChannel transportChannel = new TcpTransportChannel( outboundHandler, channel, action, @@ -241,7 +241,7 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); - TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel); try { messageListener.onRequestReceived(requestId, action); if (message.isShortCircuit()) { diff --git a/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java index 464282730d2b2..98c182c562928 100644 --- a/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java +++ b/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java @@ -91,14 +91,14 @@ public void processMessageReceived(Request request, TransportChannel channel) th Releasable unregisterTask = () -> taskManager.unregister(task); try { - if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) { + if (channel instanceof BaseTcpTransportChannel && task instanceof CancellableTask) { if (request instanceof ShardSearchRequest) { // on receiving request, update the inbound network time to reflect time spent in transit over the network ((ShardSearchRequest) request).setInboundNetworkTime( Math.max(0, System.currentTimeMillis() - ((ShardSearchRequest) request).getInboundNetworkTime()) ); } - final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel(); + final TcpChannel tcpChannel = ((BaseTcpTransportChannel) channel).getChannel(); final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task); unregisterTask = Releasables.wrap(unregisterTask, stopTracking); } diff --git a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java index 00702d08902a9..34ea3c0ab9996 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java @@ -46,7 +46,7 @@ * * @opensearch.internal */ -public final class TcpTransportChannel implements TransportChannel { +public final class TcpTransportChannel extends BaseTcpTransportChannel { private final AtomicBoolean released = new AtomicBoolean(); private final OutboundHandler outboundHandler; @@ -70,6 +70,7 @@ public final class TcpTransportChannel implements TransportChannel { boolean isHandshake, Releasable breakerRelease ) { + super(channel); this.version = version; this.features = features; this.channel = channel; @@ -131,7 +132,4 @@ public Version getVersion() { return version; } - public TcpChannel getChannel() { - return channel; - } } From e8b033963c21b6540473cf61cc213c4aa032e544 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 5 Oct 2023 03:10:16 +0530 Subject: [PATCH 8/9] Address review comment Signed-off-by: Gagan Juneja --- .../tracing/channels/TraceableTcpTransportChannel.java | 10 +++++----- .../opensearch/transport/BaseTcpTransportChannel.java | 1 + .../org/opensearch/transport/TcpTransportChannel.java | 8 +++----- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index e9222b0787433..5ecea2fef1351 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -17,6 +17,7 @@ import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.BaseTcpTransportChannel; import org.opensearch.transport.TcpChannel; +import org.opensearch.transport.TcpTransportChannel; import org.opensearch.transport.TransportChannel; import java.io.IOException; @@ -35,10 +36,9 @@ public class TraceableTcpTransportChannel extends BaseTcpTransportChannel { * @param delegate delegate * @param span span * @param tracer tracer - * @param channel channel */ - public TraceableTcpTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel channel) { - super(channel); + public TraceableTcpTransportChannel(TcpTransportChannel delegate, Span span, Tracer tracer) { + super(delegate.getChannel()); this.delegate = delegate; this.span = span; this.tracer = tracer; @@ -53,7 +53,7 @@ public TraceableTcpTransportChannel(TransportChannel delegate, Span span, Tracer * @param tcpChannel tcpChannel * @return transport channel */ - public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) { + public static TransportChannel create(TcpTransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) { if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { tcpChannel.addCloseListener(new ActionListener() { @Override @@ -69,7 +69,7 @@ public void onFailure(Exception e) { } }); - return new TraceableTcpTransportChannel(delegate, span, tracer, tcpChannel); + return new TraceableTcpTransportChannel(delegate, span, tracer); } else { return delegate; } diff --git a/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java index b7c4523c98dd2..14e065d3350c7 100644 --- a/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java @@ -29,4 +29,5 @@ public BaseTcpTransportChannel(TcpChannel channel) { public TcpChannel getChannel() { return channel; } + } diff --git a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java index 34ea3c0ab9996..a7bedcf93e129 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java @@ -50,7 +50,6 @@ public final class TcpTransportChannel extends BaseTcpTransportChannel { private final AtomicBoolean released = new AtomicBoolean(); private final OutboundHandler outboundHandler; - private final TcpChannel channel; private final String action; private final long requestId; private final Version version; @@ -73,7 +72,6 @@ public final class TcpTransportChannel extends BaseTcpTransportChannel { super(channel); this.version = version; this.features = features; - this.channel = channel; this.outboundHandler = outboundHandler; this.action = action; this.requestId = requestId; @@ -84,7 +82,7 @@ public final class TcpTransportChannel extends BaseTcpTransportChannel { @Override public String getProfileName() { - return channel.getProfile(); + return getChannel().getProfile(); } @Override @@ -94,7 +92,7 @@ public void sendResponse(TransportResponse response) throws IOException { // update outbound network time with current time before sending response over network ((QuerySearchResult) response).getShardSearchRequest().setOutboundNetworkTime(System.currentTimeMillis()); } - outboundHandler.sendResponse(version, features, channel, requestId, action, response, compressResponse, isHandshake); + outboundHandler.sendResponse(version, features, getChannel(), requestId, action, response, compressResponse, isHandshake); } finally { release(false); } @@ -103,7 +101,7 @@ public void sendResponse(TransportResponse response) throws IOException { @Override public void sendResponse(Exception exception) throws IOException { try { - outboundHandler.sendErrorResponse(version, features, channel, requestId, action, exception); + outboundHandler.sendErrorResponse(version, features, getChannel(), requestId, action, exception); } finally { release(true); } From f13dc761b0e487a1dcc2f561d68cf7e498d26a05 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Thu, 5 Oct 2023 03:40:05 +0530 Subject: [PATCH 9/9] Address review comment Signed-off-by: Gagan Juneja --- .../tracing/channels/TraceableTcpTransportChannel.java | 6 ++---- .../main/java/org/opensearch/transport/InboundHandler.java | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index 5ecea2fef1351..333e06eb037cb 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -16,7 +16,6 @@ import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.BaseTcpTransportChannel; -import org.opensearch.transport.TcpChannel; import org.opensearch.transport.TcpTransportChannel; import org.opensearch.transport.TransportChannel; @@ -50,12 +49,11 @@ public TraceableTcpTransportChannel(TcpTransportChannel delegate, Span span, Tra * @param delegate delegate * @param span span * @param tracer tracer - * @param tcpChannel tcpChannel * @return transport channel */ - public static TransportChannel create(TcpTransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) { + public static TransportChannel create(TcpTransportChannel delegate, final Span span, final Tracer tracer) { if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { - tcpChannel.addCloseListener(new ActionListener() { + delegate.getChannel().addCloseListener(new ActionListener() { @Override public void onResponse(Void unused) { onFailure(null); diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index f6e90aed860b3..c14a53e799319 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -211,7 +211,7 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); - TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer); try { handshaker.handleHandshake(traceableTransportChannel, requestId, stream); } catch (Exception e) { @@ -241,7 +241,7 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); - TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer); try { messageListener.onRequestReceived(requestId, action); if (message.isShortCircuit()) {