From 84d80a740440eb3d9b1a9e89f7892ef054c8cb2a Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Thu, 27 May 2021 13:12:34 +0300 Subject: [PATCH 1/2] Propagate context to armeria callbacks --- ...eamMessageSubscriptionInstrumentation.java | 47 ++++++++++++++++++ .../v1_3/ArmeriaInstrumentationModule.java | 4 +- .../armeria/v1_3/SubscriberWrapper.java | 49 +++++++++++++++++++ .../armeria/v1_3/ArmeriaHttpClientTest.groovy | 5 ++ .../v1_3/AbstractArmeriaHttpClientTest.groovy | 6 --- 5 files changed, 104 insertions(+), 7 deletions(-) create mode 100644 instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java create mode 100644 instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java new file mode 100644 index 000000000000..69ff45cf7ba9 --- /dev/null +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java @@ -0,0 +1,47 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.armeria.v1_3; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.reactivestreams.Subscriber; + +public class AbstractStreamMessageSubscriptionInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("com.linecorp.armeria.common.stream.AbstractStreamMessage$SubscriptionImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and( + takesArgument(0, named("com.linecorp.armeria.common.stream.AbstractStreamMessage"))) + .and(takesArgument(1, named("org.reactivestreams.Subscriber"))), + AbstractStreamMessageSubscriptionInstrumentation.class.getName() + "$WrapSubscriberAdvice"); + } + + public static class WrapSubscriberAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void attachContext( + @Advice.Argument(value = 1, readOnly = false) Subscriber subscriber) { + Context context = Java8BytecodeBridge.currentContext(); + if (context != Context.root()) { + subscriber = new SubscriberWrapper(subscriber, context); + } + } + } +} diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java index b7ae8a9bb8df..b85398965d36 100644 --- a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java @@ -29,6 +29,8 @@ public ElementMatcher.Junction classLoaderMatcher() { @Override public List typeInstrumentations() { return asList( - new ArmeriaWebClientBuilderInstrumentation(), new ArmeriaServerBuilderInstrumentation()); + new ArmeriaWebClientBuilderInstrumentation(), + new ArmeriaServerBuilderInstrumentation(), + new AbstractStreamMessageSubscriptionInstrumentation()); } } diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java new file mode 100644 index 000000000000..d95599236bb1 --- /dev/null +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.armeria.v1_3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class SubscriberWrapper implements Subscriber { + private final Subscriber delegate; + private final Context context; + + public SubscriberWrapper(Subscriber delegate, Context context) { + this.delegate = delegate; + this.context = context; + } + + @Override + public void onSubscribe(Subscription subscription) { + try (Scope ignore = context.makeCurrent()) { + delegate.onSubscribe(subscription); + } + } + + @Override + public void onNext(Object o) { + try (Scope ignore = context.makeCurrent()) { + delegate.onNext(o); + } + } + + @Override + public void onError(Throwable throwable) { + try (Scope ignore = context.makeCurrent()) { + delegate.onError(throwable); + } + } + + @Override + public void onComplete() { + try (Scope ignore = context.makeCurrent()) { + delegate.onComplete(); + } + } +} diff --git a/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy b/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy index a0c743d9df51..6bee41bddd3f 100644 --- a/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy +++ b/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy @@ -26,4 +26,9 @@ class ArmeriaHttpClientTest extends AbstractArmeriaHttpClientTest implements Lib boolean testCallbackWithParent() { false } + + @Override + boolean testErrorWithCallback() { + return false + } } diff --git a/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy b/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy index 5c7c0f8a6783..b6b2e7d006f1 100644 --- a/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy +++ b/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy @@ -63,12 +63,6 @@ abstract class AbstractArmeriaHttpClientTest extends HttpClientTest false } - // TODO: context not propagated to callback - @Override - boolean testErrorWithCallback() { - return false - } - @Override List> extraAttributes() { [ From 5487245b3d6a54144a442ff15b8df77a13603594 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Thu, 27 May 2021 19:06:24 +0300 Subject: [PATCH 2/2] review --- ...stractStreamMessageSubscriptionInstrumentation.java | 7 +------ .../armeria/v1_3/SubscriberWrapper.java | 10 +++++++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java index 69ff45cf7ba9..e36e6fca0f43 100644 --- a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java @@ -9,10 +9,8 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -38,10 +36,7 @@ public static class WrapSubscriberAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void attachContext( @Advice.Argument(value = 1, readOnly = false) Subscriber subscriber) { - Context context = Java8BytecodeBridge.currentContext(); - if (context != Context.root()) { - subscriber = new SubscriberWrapper(subscriber, context); - } + subscriber = SubscriberWrapper.wrap(subscriber); } } } diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java index d95599236bb1..a28d33fa7f0d 100644 --- a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java @@ -14,11 +14,19 @@ public class SubscriberWrapper implements Subscriber { private final Subscriber delegate; private final Context context; - public SubscriberWrapper(Subscriber delegate, Context context) { + private SubscriberWrapper(Subscriber delegate, Context context) { this.delegate = delegate; this.context = context; } + public static Subscriber wrap(Subscriber delegate) { + Context context = Context.current(); + if (context != Context.root()) { + return new SubscriberWrapper(delegate, context); + } + return delegate; + } + @Override public void onSubscribe(Subscription subscription) { try (Scope ignore = context.makeCurrent()) {