diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java index 05b432352e69..c385ae461532 100644 --- a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java @@ -11,6 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.concurrent.CompletableFuture; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -30,15 +31,32 @@ public void transform(TypeTransformer transformer) { takesArgument(0, named("com.linecorp.armeria.common.stream.AbstractStreamMessage"))) .and(takesArgument(1, named("org.reactivestreams.Subscriber"))), AbstractStreamMessageSubscriptionInstrumentation.class.getName() + "$WrapSubscriberAdvice"); + // since 1.9.0 + transformer.applyAdviceToMethod( + isConstructor() + .and( + takesArgument(0, named("com.linecorp.armeria.common.stream.AbstractStreamMessage"))) + .and(takesArgument(4, named("java.util.concurrent.CompletableFuture"))), + AbstractStreamMessageSubscriptionInstrumentation.class.getName() + + "$WrapCompletableFutureAdvice"); } @SuppressWarnings("unused") public static class WrapSubscriberAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void attachContext( - @Advice.Argument(value = 1, readOnly = false) Subscriber subscriber) { + public static void wrapSubscriber( + @Advice.Argument(value = 1, readOnly = false) Subscriber subscriber) { subscriber = SubscriberWrapper.wrap(subscriber); } } + + public static class WrapCompletableFutureAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void wrapCompletableFuture( + @Advice.Argument(value = 4, readOnly = false) CompletableFuture future) { + future = CompletableFutureWrapper.wrap(future); + } + } } diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/CompletableFutureWrapper.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/CompletableFutureWrapper.java new file mode 100644 index 000000000000..1815f7c3ca0b --- /dev/null +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/CompletableFutureWrapper.java @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.armeria.v1_3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.concurrent.CompletableFuture; + +public final class CompletableFutureWrapper { + + private CompletableFutureWrapper() {} + + public static CompletableFuture wrap(CompletableFuture future) { + if (future == null) { + return null; + } + Context context = Context.current(); + if (context != Context.root()) { + return wrap(future, context); + } + return future; + } + + private static CompletableFuture wrap(CompletableFuture future, Context context) { + CompletableFuture result = new CompletableFuture<>(); + result.whenComplete( + (T value, Throwable throwable) -> { + try (Scope ignored = context.makeCurrent()) { + if (throwable != null) { + future.completeExceptionally(throwable); + } else { + future.complete(value); + } + } + }); + + return result; + } +} diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java index ef006aec6471..2b1b5a1d30c8 100644 --- a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java @@ -10,19 +10,49 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -public class SubscriberWrapper implements Subscriber { - private final Subscriber delegate; +public class SubscriberWrapper implements Subscriber { + private static final Class abortingSubscriberClass = getAbortingSubscriberClass(); + private static final Class noopSubscriberClass = getNoopSubscriberClass(); + + private final Subscriber delegate; private final Context context; - private SubscriberWrapper(Subscriber delegate, Context context) { + private static Class getAbortingSubscriberClass() { + // AbortingSubscriber is package private + try { + return Class.forName("com.linecorp.armeria.common.stream.AbortingSubscriber"); + } catch (ClassNotFoundException exception) { + return null; + } + } + + private static Class getNoopSubscriberClass() { + // NoopSubscriber is package private + try { + return Class.forName("com.linecorp.armeria.common.stream.NoopSubscriber"); + } catch (ClassNotFoundException exception) { + return null; + } + } + + private SubscriberWrapper(Subscriber delegate, Context context) { this.delegate = delegate; this.context = context; } - public static Subscriber wrap(Subscriber delegate) { + private static boolean isIgnored(Subscriber delegate) { + return (abortingSubscriberClass != null && abortingSubscriberClass.isInstance(delegate)) + || (noopSubscriberClass != null && noopSubscriberClass.isInstance(delegate)); + } + + public static Subscriber wrap(Subscriber delegate) { + if (isIgnored(delegate)) { + return delegate; + } + Context context = Context.current(); if (context != Context.root()) { - return new SubscriberWrapper(delegate, context); + return new SubscriberWrapper<>(delegate, context); } return delegate; } @@ -35,7 +65,7 @@ public void onSubscribe(Subscription subscription) { } @Override - public void onNext(Object o) { + public void onNext(T o) { try (Scope ignored = context.makeCurrent()) { delegate.onNext(o); }