From 546afa939a76975b582fc47539a789e1a419bbe5 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 20 Nov 2023 12:06:21 -0500 Subject: [PATCH] [BUG] The thread context is not properly cleared and messes up the traces (#10873) * [BUG] The thread context is not properly cleared and messes up the traces Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../telemetry/tracing/DefaultSpanScope.java | 4 +- .../telemetry/tracing/DefaultTracer.java | 9 +- .../opensearch/telemetry/tracing/Tracer.java | 4 +- .../tracing/TracingContextPropagator.java | 4 +- .../telemetry/tracing/noop/NoopTracer.java | 4 +- .../TransportTracer.java} | 16 +- .../{http => transport}/package-info.java | 4 +- .../telemetry/tracing/DefaultTracerTests.java | 35 ++-- .../TelemetryTracerEnabledSanityIT.java | 22 ++- .../tracing/OTelTracingContextPropagator.java | 10 +- .../OTelTracingContextPropagatorTests.java | 4 +- .../http/AbstractHttpServerTransport.java | 9 +- ...hreadContextBasedTracerContextStorage.java | 13 +- .../telemetry/tracing/TracerFactory.java | 12 +- .../telemetry/tracing/WrappedTracer.java | 4 +- .../opensearch/transport/InboundHandler.java | 11 +- .../transport/TransportService.java | 122 ++++++------ ...ContextBasedTracerContextStorageTests.java | 174 ++++++++++++++++++ .../tracing/MockTracingContextPropagator.java | 4 +- .../tracing/TelemetryValidators.java | 2 +- .../NumberOfTraceIDsEqualToRequests.java | 11 +- 22 files changed, 338 insertions(+), 141 deletions(-) rename libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/{http/HttpTracer.java => transport/TransportTracer.java} (64%) rename libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/{http => transport}/package-info.java (65%) create mode 100644 server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a40580cbb3238..55f628a4532d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993)) +- [BUG] Fix the thread context that is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873)) ### Security diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java index a5d515443b54d..decbf49f795c4 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java @@ -44,23 +44,23 @@ private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextSt public static SpanScope create(Span span, TracerContextStorage tracerContextStorage) { final SpanScope beforeSpanScope = spanScopeThreadLocal.get(); SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage); - spanScopeThreadLocal.set(newSpanScope); return newSpanScope; } @Override public void close() { detach(); - spanScopeThreadLocal.set(previousSpanScope); } @Override public SpanScope attach() { + spanScopeThreadLocal.set(this); tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span); return this; } private void detach() { + spanScopeThreadLocal.set(previousSpanScope); if (previousSpanScope != null) { tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan()); } else { diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index a3bb64ea392a9..8f1a26d99e725 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -12,7 +12,7 @@ import java.io.Closeable; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -53,7 +53,6 @@ public Span startSpan(SpanCreationContext context) { parentSpan = getCurrentSpanInternal(); } Span span = createSpan(context, parentSpan); - setCurrentSpanInContext(span); addDefaultAttributes(span); return span; } @@ -94,10 +93,6 @@ private Span createSpan(SpanCreationContext spanCreationContext, Span parentSpan return tracingTelemetry.createSpan(spanCreationContext, parentSpan); } - private void setCurrentSpanInContext(Span span) { - tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, span); - } - /** * Adds default attributes in the span * @param span the current active span @@ -107,7 +102,7 @@ protected void addDefaultAttributes(Span span) { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { Optional propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers); return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null))); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java index 8257d251e9560..9b49ca7668992 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java @@ -9,7 +9,7 @@ package org.opensearch.telemetry.tracing; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.telemetry.tracing.http.HttpTracer; +import org.opensearch.telemetry.tracing.transport.TransportTracer; import java.io.Closeable; @@ -22,7 +22,7 @@ * @opensearch.experimental */ @ExperimentalApi -public interface Tracer extends HttpTracer, Closeable { +public interface Tracer extends TransportTracer, Closeable { /** * Starts the {@link Span} with given {@link SpanCreationContext} * diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java index 5fbc5d329e227..d7d48d1db10d6 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java @@ -10,7 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -36,7 +36,7 @@ public interface TracingContextPropagator { * @param headers request headers to extract the context from * @return current span */ - Optional extractFromHeaders(Map> headers); + Optional extractFromHeaders(Map> headers); /** * Injects tracing context diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java index 50452ff5fe3b4..c57eaccf1f3df 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java @@ -16,7 +16,7 @@ import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import java.util.List; +import java.util.Collection; import java.util.Map; /** @@ -65,7 +65,7 @@ public void close() { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> header) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> header) { return NoopSpan.INSTANCE; } } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java similarity index 64% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java index 50d18c0a0d040..5883d7de8e83a 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java @@ -6,31 +6,31 @@ * compatible open source license. */ -package org.opensearch.telemetry.tracing.http; +package org.opensearch.telemetry.tracing.transport; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanCreationContext; -import java.util.List; +import java.util.Collection; import java.util.Map; /** - * HttpTracer helps in creating a {@link Span} which reads the incoming tracing information - * from the HttpRequest header and propagate the span accordingly. + * TransportTracer helps in creating a {@link Span} which reads the incoming tracing information + * from the HTTP or TCP transport headers and propagate the span accordingly. *

* All methods on the Tracer object are multi-thread safe. * * @opensearch.experimental */ @ExperimentalApi -public interface HttpTracer { +public interface TransportTracer { /** * Start the span with propagating the tracing info from the HttpRequest header. * * @param spanCreationContext span name. - * @param header http request header. - * @return span. + * @param headers transport headers + * @return the span instance */ - Span startSpan(SpanCreationContext spanCreationContext, Map> header); + Span startSpan(SpanCreationContext spanCreationContext, Map> headers); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java similarity index 65% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java index 9feb862a4e010..87ffcc43184bb 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java @@ -7,6 +7,6 @@ */ /** - * Contains No-op implementations + * Contains HTTP or TCP transport related tracer capabilities */ -package org.opensearch.telemetry.tracing.http; +package org.opensearch.telemetry.tracing.transport; diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java index 2a791f1ae4164..2182b3ea28ac8 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java @@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -35,7 +37,6 @@ public class DefaultTracerTests extends OpenSearchTestCase { private Span mockSpan; private Span mockParentSpan; - private SpanScope mockSpanScope; private ThreadPool threadPool; private ExecutorService executorService; private SpanCreationContext spanCreationContext; @@ -102,11 +103,11 @@ public void testCreateSpanWithAttributes() { Span span = defaultTracer.startSpan(spanCreationContext); - assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(1.0, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key1")); - assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2")); - assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3")); - assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4")); + assertThat(defaultTracer.getCurrentSpan(), is(nullValue())); + assertEquals(1.0, ((MockSpan) span).getAttribute("key1")); + assertEquals(2l, ((MockSpan) span).getAttribute("key2")); + assertEquals(true, ((MockSpan) span).getAttribute("key3")); + assertEquals("key4", ((MockSpan) span).getAttribute("key4")); span.endSpan(); } @@ -121,16 +122,18 @@ public void testCreateSpanWithParent() { Span span = defaultTracer.startSpan(spanCreationContext, null); - SpanContext parentSpan = defaultTracer.getCurrentSpan(); - - SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan()); + try (final SpanScope scope = defaultTracer.withSpanInScope(span)) { + SpanContext parentSpan = defaultTracer.getCurrentSpan(); - Span span1 = defaultTracer.startSpan(spanCreationContext1); + SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan()); - assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); - span1.endSpan(); - span.endSpan(); + try (final ScopedSpan span1 = defaultTracer.startScopedSpan(spanCreationContext1)) { + assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName()); + assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); + } + } finally { + span.endSpan(); + } } @SuppressWarnings("unchecked") @@ -155,8 +158,7 @@ public void testCreateSpanWithNullParent() { Span span = defaultTracer.startSpan(spanCreationContext); - assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan()); + assertThat(defaultTracer.getCurrentSpan(), is(nullValue())); span.endSpan(); } @@ -403,7 +405,6 @@ private void setupMocks() { mockTracingTelemetry = mock(TracingTelemetry.class); mockSpan = mock(Span.class); mockParentSpan = mock(Span.class); - mockSpanScope = mock(SpanScope.class); mockTracerContextStorage = mock(TracerContextStorage.class); when(mockSpan.getSpanName()).thenReturn("span_name"); when(mockSpan.getSpanId()).thenReturn("span_id"); diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java index f07f2b308e801..156dc344d1ae2 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java @@ -61,20 +61,22 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { // Create Index and ingest data String indexName = "test-index-11"; - Settings basicSettings = Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build(); + Settings basicSettings = Settings.builder() + .put("number_of_shards", 2) + .put("number_of_replicas", 0) + .put("index.routing.allocation.total_shards_per_node", 1) + .build(); createIndex(indexName, basicSettings); - indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well")); - indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field2", "another fox did the same.")); + + indexRandom(false, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well")); + indexRandom(false, client.prepareIndex(indexName).setId("2").setSource("field2", "another fox did the same.")); ensureGreen(); refresh(); // Make the search calls; adding the searchType and PreFilterShardSize to make the query path predictable across all the runs. - client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("fox")).get(); - client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("jumps")).get(); - - ensureGreen(); - refresh(); + client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("fox")).get(); + client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("jumps")).get(); // Sleep for about 3s to wait for traces are published, delay is (the delay is 1s). Thread.sleep(3000); @@ -88,8 +90,10 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { ) ); + // See please https://github.com/opensearch-project/OpenSearch/issues/10291 till local transport is not instrumented, + // capturing only the inter-nodes transport actions. InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE; - validators.validate(exporter.getFinishedSpanItems(), 6); + validators.validate(exporter.getFinishedSpanItems(), 4); } private static void updateTelemetrySetting(Client client, boolean value) { diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java index f8fe885ee450c..0fb05a08c27bb 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java @@ -10,8 +10,8 @@ import org.opensearch.core.common.Strings; +import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -51,7 +51,7 @@ private static OTelPropagatedSpan getPropagatedSpan(Context context) { } @Override - public Optional extractFromHeaders(Map> headers) { + public Optional extractFromHeaders(Map> headers) { Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER); return Optional.ofNullable(getPropagatedSpan(context)); } @@ -87,9 +87,9 @@ public String get(Map headers, String key) { } }; - private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { + private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { @Override - public Iterable keys(Map> headers) { + public Iterable keys(Map> headers) { if (headers != null) { return headers.keySet(); } else { @@ -98,7 +98,7 @@ public Iterable keys(Map> headers) { } @Override - public String get(Map> headers, String key) { + public String get(Map> headers, String key) { if (headers != null && headers.containsKey(key)) { return Strings.collectionToCommaDelimitedString(headers.get(key)); } diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java index 16a3ec9493d5d..d865a329104c1 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java @@ -11,8 +11,8 @@ import org.opensearch.test.OpenSearchTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import io.opentelemetry.api.OpenTelemetry; @@ -57,7 +57,7 @@ public void testExtractTracerContextFromHeader() { } public void testExtractTracerContextFromHttpHeader() { - Map> requestHeaders = new HashMap<>(); + Map> requestHeaders = new HashMap<>(); requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00")); OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index b8f8abb6c2c23..257aca2b67990 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -69,9 +69,11 @@ import java.nio.channels.CancelledKeyException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -362,7 +364,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { * @param httpChannel that received the http request */ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) { - final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), httpRequest.getHeaders()); + final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), extractHeaders(httpRequest.getHeaders())); try (final SpanScope httpRequestSpanScope = tracer.withSpanInScope(span)) { HttpChannel traceableHttpChannel = TraceableHttpChannel.create(httpChannel, span, tracer); handleIncomingRequest(httpRequest, traceableHttpChannel, httpRequest.getInboundException()); @@ -483,4 +485,9 @@ private static ActionListener earlyResponseListener(HttpRequest request, H return NO_OP; } } + + @SuppressWarnings("unchecked") + private static > Map> extractHeaders(Map headers) { + return (Map>) headers; + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 208df90f65d74..863f56d9fbe94 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -15,7 +15,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; /** * Core's ThreadContext based TracerContextStorage implementation @@ -79,17 +78,7 @@ public Map headers(Map source) { } Span getCurrentSpan(String key) { - Optional optionalSpanFromContext = spanFromThreadContext(key); - return optionalSpanFromContext.orElse(spanFromHeader()); - } - - private Optional spanFromThreadContext(String key) { SpanReference currentSpanRef = threadContext.getTransient(key); - return (currentSpanRef == null) ? Optional.empty() : Optional.ofNullable(currentSpanRef.getSpan()); - } - - private Span spanFromHeader() { - Optional span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders()); - return span.orElse(null); + return (currentSpanRef == null) ? null : currentSpanRef.getSpan(); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java index 1cb73e0247c3a..b0cecb0ee485d 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java @@ -62,6 +62,13 @@ public void close() { } } + protected TracerContextStorage createTracerContextStorage( + TracingTelemetry tracingTelemetry, + ThreadContext threadContext + ) { + return new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry); + } + private Tracer tracer(Optional telemetry, ThreadContext threadContext) { return telemetry.map(Telemetry::getTracingTelemetry) .map(tracingTelemetry -> createDefaultTracer(tracingTelemetry, threadContext)) @@ -70,10 +77,7 @@ private Tracer tracer(Optional telemetry, ThreadContext threadContext } private Tracer createDefaultTracer(TracingTelemetry tracingTelemetry, ThreadContext threadContext) { - TracerContextStorage tracerContextStorage = new ThreadContextBasedTracerContextStorage( - threadContext, - tracingTelemetry - ); + TracerContextStorage tracerContextStorage = createTracerContextStorage(tracingTelemetry, threadContext); return new DefaultTracer(tracingTelemetry, tracerContextStorage); } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java index 631fb8242d78e..dfe456a0a6784 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java @@ -13,7 +13,7 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; /** @@ -75,7 +75,7 @@ Tracer getDelegateTracer() { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { return defaultTracer.startSpan(spanCreationContext, headers); } } diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index c14a53e799319..a8315c3cae4e0 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -57,6 +57,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; /** * Handler for inbound data @@ -188,11 +192,16 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st } } + private Map> extractHeaders(Map headers) { + return headers.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> Collections.singleton(e.getValue()))); + } + private void handleRequest(TcpChannel channel, Header header, InboundMessage message) throws IOException { final String action = header.getActionName(); final long requestId = header.getRequestId(); final Version version = header.getVersion(); - Span span = tracer.startSpan(SpanBuilder.from(action, channel)); + final Map> headers = extractHeaders(header.getHeaders().v1()); + Span span = tracer.startSpan(SpanBuilder.from(action, channel), headers); try (SpanScope spanScope = tracer.withSpanInScope(span)) { if (header.isHandshake()) { messageListener.onRequestReceived(requestId, action); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index de88c3619abe8..5aeed72f306db 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -867,59 +867,18 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); - try { - logger.debug("Action: " + action); - final TransportResponseHandler delegate; - if (request.getParentTask().isSet()) { - // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. - final Releasable unregisterChildNode = taskManager.registerChildNode( - request.getParentTask().getId(), - connection.getNode() - ); - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleResponse(response); - } - - @Override - public void handleException(TransportException exp) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleException(exp); - } - - @Override - public String executor() { - return traceableTransportResponseHandler.executor(); - } - - @Override - public T read(StreamInput in) throws IOException { - return traceableTransportResponseHandler.read(in); - } - - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + handler.toString(); - } - }; - } else { - delegate = traceableTransportResponseHandler; - } - asyncSender.sendRequest(connection, action, request, options, delegate); - } catch (final Exception ex) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException) { - te = (TransportException) ex; - } else { - te = new TransportException("failure to send", ex); - } - traceableTransportResponseHandler.handleException(te); + if (connection == localNodeConnection) { + // See please https://github.com/opensearch-project/OpenSearch/issues/10291 + sendRequestAsync(connection, action, request, options, handler); + } else { + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create( + handler, + span, + tracer + ); + sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler); } } } @@ -1690,4 +1649,61 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } } } + + private void sendRequestAsync( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final TransportRequestOptions options, + final TransportResponseHandler handler + ) { + try { + logger.debug("Action: " + action); + final TransportResponseHandler delegate; + if (request.getParentTask().isSet()) { + // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. + final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); + delegate = new TransportResponseHandler() { + @Override + public void handleResponse(T response) { + unregisterChildNode.close(); + handler.handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + unregisterChildNode.close(); + handler.handleException(exp); + } + + @Override + public String executor() { + return handler.executor(); + } + + @Override + public T read(StreamInput in) throws IOException { + return handler.read(in); + } + + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); + } + }; + } else { + delegate = handler; + } + asyncSender.sendRequest(connection, action, request, options, delegate); + } catch (final Exception ex) { + // the caller might not handle this so we invoke the handler + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + handler.handleException(te); + } + } } diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java new file mode 100644 index 0000000000000..3a98a67b53920 --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.concurrent.ThreadContext.StoredContext; +import org.opensearch.telemetry.Telemetry; +import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.metrics.MetricsTelemetry; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; +import org.junit.After; +import org.junit.Before; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING; +import static org.opensearch.telemetry.TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING; +import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; + +public class ThreadContextBasedTracerContextStorageTests extends OpenSearchTestCase { + private Tracer tracer; + private ThreadContext threadContext; + private TracerContextStorage threadContextStorage; + private ExecutorService executorService; + + @SuppressWarnings("resource") + @Before + public void setUp() throws Exception { + super.setUp(); + + final Settings settings = Settings.builder() + .put(TRACER_ENABLED_SETTING.getKey(), true) + .put(TRACER_SAMPLER_PROBABILITY.getKey(), 1d) + .put(TRACER_FEATURE_ENABLED_SETTING.getKey(), true) + .build(); + + final TelemetrySettings telemetrySettings = new TelemetrySettings( + settings, + new ClusterSettings(Settings.EMPTY, Set.of(TRACER_ENABLED_SETTING, TRACER_SAMPLER_PROBABILITY)) + ); + + final TracingTelemetry tracingTelemetry = new MockTracingTelemetry(); + + threadContext = new ThreadContext(Settings.EMPTY); + threadContextStorage = new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry); + + tracer = new TracerFactory(telemetrySettings, Optional.of(new Telemetry() { + @Override + public MetricsTelemetry getMetricsTelemetry() { + return null; + } + + @Override + public TracingTelemetry getTracingTelemetry() { + return tracingTelemetry; + } + }), threadContext) { + @Override + protected TracerContextStorage createTracerContextStorage( + TracingTelemetry tracingTelemetry, + ThreadContext threadContext + ) { + return threadContextStorage; + } + }.getTracer(); + + executorService = Executors.newSingleThreadExecutor(); + assertThat(tracer, not(instanceOf(NoopTracer.class))); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + executorService.shutdown(); + tracer.close(); + } + + public void testStartingSpanDoesNotChangeThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testSpanInScopeChangesThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testStashingPropagatesThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testPreservingContextThreadContext() throws InterruptedException, ExecutionException, TimeoutException { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + final Runnable r = new Runnable() { + @Override + public void run() { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + }; + + executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testPreservingContextAndStashingThreadContext() throws InterruptedException, ExecutionException, TimeoutException { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + final Runnable r = new Runnable() { + @Override + public void run() { + final Span local = tracer.startSpan(SpanCreationContext.internal().name("test-local")); + try (SpanScope localScope = tracer.withSpanInScope(local)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat( + threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), + is(not(nullValue())) + ); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local)); + } + } + } + }; + + executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java index 6d0cd6d0b1290..4c58352531ca8 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java @@ -14,7 +14,7 @@ import org.opensearch.telemetry.tracing.TracingContextPropagator; import org.opensearch.telemetry.tracing.attributes.Attributes; -import java.util.List; +import java.util.Collection; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -52,7 +52,7 @@ public Optional extract(Map props) { } @Override - public Optional extractFromHeaders(Map> headers) { + public Optional extractFromHeaders(Map> headers) { if (headers != null) { Map convertedHeader = headers.entrySet() .stream() diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java index 8d8c18fb9ef6b..9b5d84954908b 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java @@ -49,7 +49,7 @@ private String printProblematicSpansMap(Map> spanMap) StringBuilder sb = new StringBuilder(); for (var entry : spanMap.entrySet()) { sb.append("SpanData validation failed for validator " + entry.getKey()); - sb.append("/n"); + sb.append("\n"); for (MockSpanData span : entry.getValue()) { sb.append(span.toString()); } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java index 5fe268a8f0581..045d3a85e21e7 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java @@ -13,9 +13,9 @@ import org.opensearch.test.telemetry.tracing.TracingValidator; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; /** @@ -41,13 +41,10 @@ public NumberOfTraceIDsEqualToRequests(Attributes attributes) { */ @Override public List validate(List spans, int requests) { - Set totalTraceIDs = spans.stream() - .filter(span -> isMatchingSpan(span)) - .map(MockSpanData::getTraceID) - .collect(Collectors.toSet()); + final Collection totalTraceIDs = spans.stream().filter(span -> isMatchingSpan(span)).collect(Collectors.toList()); List problematicSpans = new ArrayList<>(); - if (totalTraceIDs.size() != requests) { - problematicSpans.addAll(spans); + if (totalTraceIDs.stream().map(MockSpanData::getTraceID).distinct().count() != requests) { + problematicSpans.addAll(totalTraceIDs); } return problematicSpans; }