Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Tracing Instrumentation] Add instrumentation in InboundHandler #10381

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122))
- Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246))
- Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))
- Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143))
- Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_TRANSPORT_NAME,
Expand All @@ -108,7 +109,8 @@
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),

Check warning on line 112 in modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java

View check run for this annotation

Codecov / codecov/patch

modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java#L112

Added line #L112 was not covered by tests
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Netty4NioSocketChannel;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -131,9 +132,10 @@ public Netty4Transport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
SharedGroupFactory sharedGroupFactory
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void startThreadPool() {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -141,7 +142,8 @@ private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
transport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -82,7 +83,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
PageCacheRecycler.NON_RECYCLING_INSTANCE,
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ protected MockTransportService createTransportService() {
new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE,
writableRegistry(),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
) {
@Override
public TransportAddress[] addressesFromString(String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected MockTransportService createTransportService() {
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@

@Override
public void setError(Exception exception) {
delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage());
if (exception != null) {
delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage());

Check warning on line 61 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L61

Added line #L61 was not covered by tests
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.nio.NioSelector;
import org.opensearch.nio.NioSocketChannel;
import org.opensearch.nio.ServerChannelContext;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpTransport;
import org.opensearch.transport.TransportSettings;
Expand Down Expand Up @@ -84,9 +85,10 @@ protected NioTransport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
NioGroupFactory groupFactory
NioGroupFactory groupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
this.pageAllocator = new PageAllocator(pageCacheRecycler);
this.groupFactory = groupFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NIO_TRANSPORT_NAME,
Expand All @@ -103,7 +104,8 @@
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getNioGroupFactory(settings)
getNioGroupFactory(settings),

Check warning on line 107 in plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java#L107

Added line #L107 was not covered by tests
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -81,7 +82,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
new MockPageCacheRecycler(settings),
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new NioGroupFactory(settings, logger)
new NioGroupFactory(settings, logger),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public NetworkModule(
pageCacheRecycler,
circuitBreakerService,
namedWriteableRegistry,
networkService
networkService,
tracer
);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ default Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ private AttributeNames() {
*/
public static final String TRANSPORT_TARGET_HOST = "target_host";

/**
* Transport Service send request local host.
*/
public static final String TRANSPORT_HOST = "host";

/**
* Action Name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.Transport;

import java.util.Arrays;
Expand Down Expand Up @@ -127,4 +128,26 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio
return attributes;
}

/**
* Creates {@link SpanCreationContext} from Inbound Handler.
* @param action action.
* @param tcpChannel tcp channel.
* @return context
*/
public static SpanCreationContext from(String action, TcpChannel tcpChannel) {
return SpanCreationContext.server().name(createSpanName(action, tcpChannel)).attributes(buildSpanAttributes(action, tcpChannel));
}

private static String createSpanName(String action, TcpChannel tcpChannel) {
return action + SEPARATOR + (tcpChannel.getRemoteAddress() != null
? tcpChannel.getRemoteAddress().getHostString()
: tcpChannel.getLocalAddress().getHostString());
}

private static Attributes buildSpanAttributes(String action, TcpChannel tcpChannel) {
Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, action);
attributes.addAttribute(AttributeNames.TRANSPORT_HOST, tcpChannel.getLocalAddress().getHostString());
return attributes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.channels;

import org.opensearch.Version;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.BaseTcpTransportChannel;
import org.opensearch.transport.TcpTransportChannel;
import org.opensearch.transport.TransportChannel;

import java.io.IOException;

/**
* Tracer wrapped {@link TransportChannel}
*/
public class TraceableTcpTransportChannel extends BaseTcpTransportChannel {

private final TransportChannel delegate;
private final Span span;
private final Tracer tracer;

/**
* Constructor.
* @param delegate delegate
* @param span span
* @param tracer tracer
*/
public TraceableTcpTransportChannel(TcpTransportChannel delegate, Span span, Tracer tracer) {
super(delegate.getChannel());
this.delegate = delegate;
this.span = span;
this.tracer = tracer;
}

/**
* Factory method.
*
* @param delegate delegate
* @param span span
* @param tracer tracer
* @return transport channel
*/
public static TransportChannel create(TcpTransportChannel delegate, final Span span, final Tracer tracer) {
if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) {
delegate.getChannel().addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
onFailure(null);
}

@Override
public void onFailure(Exception e) {
span.addEvent("The TransportChannel was closed without sending the response");
span.setError(e);
span.endSpan();
}
});

return new TraceableTcpTransportChannel(delegate, span, tracer);
} else {
return delegate;
}
}

@Override
public String getProfileName() {
return delegate.getProfileName();

Check warning on line 78 in server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java#L78

Added line #L78 was not covered by tests
}

@Override
public String getChannelType() {
return delegate.getChannelType();
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(response);
} catch (final IOException ex) {
span.setError(ex);
throw ex;

Check warning on line 92 in server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java#L90-L92

Added lines #L90 - L92 were not covered by tests
} finally {
span.endSpan();
}
}

@Override
public void sendResponse(Exception exception) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(exception);
} finally {
span.setError(exception);
span.endSpan();
}
}

@Override
public Version getVersion() {
return delegate.getVersion();
}
}
Loading
Loading