From 00080fcfe49de3c4d1a28cc4660ac3ceff4cae43 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 28 Apr 2021 18:01:20 +0200 Subject: [PATCH] make reactor 0.9 behave the same way 1.0 does, add request error test --- .../reactornetty/v0_9/DecoratorFunctions.java | 20 ++++++--- .../ReactorNettyInstrumentationModule.java | 7 ++- .../AbstractReactorNettyHttpClientTest.groovy | 43 ++++++++++++++++++- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java index 5968ab31e461..bf6dc7b5ec8d 100644 --- a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java +++ b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java @@ -24,14 +24,23 @@ public static boolean shouldDecorate(Class callbackClass) { private abstract static class OnMessageDecorator implements BiConsumer { private final BiConsumer delegate; + private final boolean forceParentContext; - public OnMessageDecorator(BiConsumer delegate) { + public OnMessageDecorator(BiConsumer delegate, + boolean forceParentContext) { this.delegate = delegate; + this.forceParentContext = forceParentContext; } @Override public final void accept(M message, Connection connection) { - Context context = getChannelContext(currentContext(message), connection.channel()); + Channel channel = connection.channel(); + // don't try to get the client span from the netty channel when forceParentSpan is true + // this way the parent context will always be propagated + if (forceParentContext) { + channel = null; + } + Context context = getChannelContext(currentContext(message), channel); if (context == null) { delegate.accept(message, connection); } else { @@ -46,7 +55,7 @@ public final void accept(M message, Connection connection) { public static final class OnRequestDecorator extends OnMessageDecorator { public OnRequestDecorator(BiConsumer delegate) { - super(delegate); + super(delegate, false); } @Override @@ -57,8 +66,9 @@ reactor.util.context.Context currentContext(HttpClientRequest message) { public static final class OnResponseDecorator extends OnMessageDecorator { public OnResponseDecorator( - BiConsumer delegate) { - super(delegate); + BiConsumer delegate, + boolean forceParentContext) { + super(delegate, forceParentContext); } @Override diff --git a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java index 7ff0cd0bb148..65b4c43ac744 100644 --- a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyInstrumentationModule.java @@ -171,9 +171,12 @@ public static class OnResponseAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + BiConsumer callback, + @Advice.Origin("#m") String methodName) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnResponseDecorator(callback); + boolean forceParentContext = methodName.equals("doAfterResponse"); + callback = new DecoratorFunctions.OnResponseDecorator(callback, + forceParentContext); } } } diff --git a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/AbstractReactorNettyHttpClientTest.groovy b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/AbstractReactorNettyHttpClientTest.groovy index 2974092cd38b..ad5bd95a665c 100644 --- a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/AbstractReactorNettyHttpClientTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/AbstractReactorNettyHttpClientTest.groovy @@ -3,10 +3,14 @@ * SPDX-License-Identifier: Apache-2.0 */ + +import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.sdk.trace.data.SpanData @@ -94,11 +98,48 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest() + + def httpClient = createHttpClient() + .doOnRequestError({ rq, err -> onRequestErrorSpan.set(Span.current()) }) + + when: + runUnderTrace("parent") { + httpClient.get() + .uri("http://localhost:$UNUSABLE_PORT/") + .response() + .block() + } + + then: + def ex = thrown(Exception) + + assertTraces(1) { + trace(0, 2) { + def parentSpan = span(0) + + basicSpan(it, 0, "parent", null, ex) + span(1) { + def actualException = ex.cause + kind SpanKind.CLIENT + childOf parentSpan + status StatusCode.ERROR + errorEvent(actualException.class, actualException.message) + } + + assertSameSpan(parentSpan, onRequestErrorSpan) } } } + private static void assertSameSpan(SpanData expected, AtomicReference actual) { def expectedSpanContext = expected.spanContext def actualSpanContext = actual.get().spanContext