Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reactor instrumentation to pick up spans from reactor context #4159

Merged
merged 9 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package io.opentelemetry.instrumentation.lettuce.v5_1

import io.lettuce.core.RedisClient
import io.lettuce.core.resource.ClientResources
import io.opentelemetry.instrumentation.reactor.TracingOperator
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared

class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implements LibraryTestTrait {
@Shared
TracingOperator tracingOperator = TracingOperator.create()
ContextPropagationOperator tracingOperator = ContextPropagationOperator.create()

@Override
RedisClient createClient(String uri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.reactor.TracingOperator;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
Expand All @@ -34,7 +34,7 @@ public static class ResetOnEachOperatorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void postStaticInitializer() {
TracingOperator.newBuilder()
ContextPropagationOperator.newBuilder()
.setCaptureExperimentalSpanAttributes(
Config.get()
.getBoolean("otel.instrumentation.reactor.experimental-span-attributes", false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Scope
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -70,6 +73,134 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}

def "should capture nested Mono spans"() {
setup:
def mono = Mono.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Mono.just("Value")
})

def result = new TracedWithSpan()
.outer(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "TracedWithSpan.outer"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested spans from current"() {
setup:
Span parent = GlobalOpenTelemetry.getTracer("test")
.spanBuilder("parent").startSpan()

Scope scope = parent.makeCurrent()

def result = new TracedWithSpan()
.mono(Mono.defer({ ->
Span inner = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
inner.end()
return Mono.just("Value")
}))

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

scope.close()
parent.end()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested Flux spans"() {
setup:
def mono = Flux.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Flux.just("Value")
})

def result = new TracedWithSpan()
.flux(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}

def "should capture span for already errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public Mono<String> mono(Mono<String> mono) {
return mono;
}

@WithSpan
public Mono<String> outer(Mono<String> inner) {
return mono(inner);
}

@WithSpan
public Flux<String> flux(Flux<String> flux) {
return flux;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,62 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/** Based on Spring Sleuth's Reactor instrumentation. */
public final class TracingOperator {
public final class ContextPropagationOperator {

public static TracingOperator create() {
public static ContextPropagationOperator create() {
return newBuilder().build();
}

public static TracingOperatorBuilder newBuilder() {
return new TracingOperatorBuilder();
public static ContextPropagationOperatorBuilder newBuilder() {
return new ContextPropagationOperatorBuilder();
}

private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;

TracingOperator(boolean captureExperimentalSpanAttributes) {
private static final Object TRACE_CONTEXT_KEY =
new Object() {
@Override
public String toString() {
return "otel-trace-context";
}
};

private static volatile Mono<String> dummyMono = Mono.just("");
private static volatile Flux<String> dummyFlux = Flux.just("");

/**
* Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to store trace context in.
* @param traceContext Trace context to be stored.
*/
public static reactor.util.context.Context storeOpenTelemetryContext(
reactor.util.context.Context context, Context traceContext) {
return context.put(TRACE_CONTEXT_KEY, traceContext);
}

/**
* Gets Trace {@link io.opentelemetry.context.Context} from Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to get trace context from.
* @param defaultTraceContext Default value to be returned if no trace context is found on Reactor
* context.
* @return Trace context or default value.
*/
public static Context getOpenTelemetryContext(
reactor.util.context.Context context, Context defaultTraceContext) {
return context.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext);
}

ContextPropagationOperator(boolean captureExperimentalSpanAttributes) {
this.asyncOperationEndStrategy =
ReactorAsyncOperationEndStrategy.newBuilder()
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
Expand All @@ -62,19 +101,47 @@ public static TracingOperatorBuilder newBuilder() {
public void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
resetDummy();
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
resetDummy();
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
return Operators.lift(new Lifter<>(asyncOperationEndStrategy));
}

/** Forces Mono to run in traceContext scope. */
static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyMono
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

/** Forces Flux to run in traceContext scope. */
static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyFlux
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

private static synchronized void resetDummy() {
// have to be reset as they capture async strategy and lift
dummyMono = Mono.just("");
dummyFlux = Flux.just("");
}

public static class Lifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package io.opentelemetry.instrumentation.reactor;

public final class TracingOperatorBuilder {
public final class ContextPropagationOperatorBuilder {
private boolean captureExperimentalSpanAttributes;

TracingOperatorBuilder() {}
ContextPropagationOperatorBuilder() {}

public TracingOperatorBuilder setCaptureExperimentalSpanAttributes(
public ContextPropagationOperatorBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
return this;
}

public TracingOperator build() {
return new TracingOperator(captureExperimentalSpanAttributes);
public ContextPropagationOperator build() {
return new ContextPropagationOperator(captureExperimentalSpanAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ protected void end(Object result, Throwable error) {

if (asyncValue instanceof Mono) {
Mono<?> mono = (Mono<?>) asyncValue;
return mono.doOnError(notificationConsumer)

return ContextPropagationOperator.runWithContext(mono, context)
.doOnError(notificationConsumer)
.doOnSuccess(notificationConsumer::onSuccess)
.doOnCancel(notificationConsumer::onCancel);
} else {
Flux<?> flux = Flux.from((Publisher<?>) asyncValue);
return flux.doOnError(notificationConsumer)
return ContextPropagationOperator.runWithContext(flux, context)
.doOnError(notificationConsumer)
.doOnComplete(notificationConsumer)
.doOnCancel(notificationConsumer::onCancel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public TracingSubscriber(
io.opentelemetry.context.Context contextToPropagate) {
this.subscriber = subscriber;
this.context = ctx;
this.traceContext = contextToPropagate;
this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate);
}

@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
withActiveSpan(() -> subscriber.onSubscribe(subscription));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class HooksTest extends LibraryInstrumentationSpecification {

def "can reset out hooks"() {
setup:
def underTest = TracingOperator.create()
def underTest = ContextPropagationOperator.create()
AtomicReference<CoreSubscriber> subscriber = new AtomicReference<>()

when: "no hook registered"
Expand Down
Loading