From acf1eddb5bb69f73a121527a01cc687a61d37878 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Mon, 31 May 2021 13:02:24 -0400 Subject: [PATCH 01/12] Stop span on cancellation of subscription to reactive publisher --- .../reactor/ReactorAsyncSpanEndStrategy.java | 13 +- .../ReactorAsyncSpanEndStrategyTest.groovy | 42 ++++++ .../rxjava2/RxJava2AsyncSpanEndStrategy.java | 30 ++-- .../RxJava2AsyncSpanEndStrategyTest.groovy | 135 ++++++++++++++++++ .../rxjava3/RxJava3AsyncSpanEndStrategy.java | 30 ++-- .../RxJava3AsyncSpanEndStrategyTest.groovy | 135 ++++++++++++++++++ 6 files changed, 365 insertions(+), 20 deletions(-) diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java index 133a9f2ea672..9f02e99fd9a2 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java @@ -8,6 +8,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.reactivestreams.Publisher; @@ -29,10 +30,14 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { new EndOnFirstNotificationConsumer(tracer, context); if (returnValue instanceof Mono) { Mono mono = (Mono) returnValue; - return mono.doOnError(notificationConsumer).doOnSuccess(notificationConsumer::onSuccess); + return mono.doOnError(notificationConsumer) + .doOnSuccess(notificationConsumer::onSuccess) + .doOnCancel(notificationConsumer::onCancel); } else { Flux flux = Flux.from((Publisher) returnValue); - return flux.doOnError(notificationConsumer).doOnComplete(notificationConsumer); + return flux.doOnError(notificationConsumer) + .doOnComplete(notificationConsumer) + .doOnCancel(notificationConsumer::onCancel); } } @@ -57,6 +62,10 @@ public void onSuccess(T ignored) { accept(null); } + public void onCancel() { + accept(new CancellationException()); + } + @Override public void run() { accept(null); diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy index dadbc2abf8be..1c486c018ff4 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy @@ -13,6 +13,8 @@ import reactor.core.publisher.UnicastProcessor import reactor.test.StepVerifier import spock.lang.Specification +import java.util.concurrent.CancellationException + class ReactorAsyncSpanEndStrategyTest extends Specification { BaseTracer tracer @@ -131,6 +133,26 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + + when: + def result = (Mono) underTest.end(tracer, context, mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel().verify() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { when: @@ -253,6 +275,26 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + + when: + def result = (Flux) underTest.end(tracer, context, source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel() + .verify() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { when: def result = (Flux) underTest.end(tracer, context, Flux.just("Value")) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java index 495b9d888814..37c1e2b708a6 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java @@ -17,6 +17,7 @@ import io.reactivex.functions.BiConsumer; import io.reactivex.functions.Consumer; import io.reactivex.parallel.ParallelFlowable; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; @@ -55,7 +56,9 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { private static Completable endWhenComplete( Completable completable, EndOnFirstNotificationConsumer notificationConsumer) { - return completable.doOnEvent(notificationConsumer); + return completable + .doOnEvent(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static Maybe endWhenMaybeComplete( @@ -63,7 +66,7 @@ private static Maybe endWhenMaybeComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return maybe.doOnEvent(typedConsumer); + return maybe.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Single endWhenSingleComplete( @@ -71,25 +74,32 @@ private static Single endWhenSingleComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return single.doOnEvent(typedConsumer); + return single.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Observable endWhenObservableComplete( Observable observable, EndOnFirstNotificationConsumer notificationConsumer) { - return observable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return observable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static ParallelFlowable endWhenFirstComplete( ParallelFlowable parallelFlowable, EndOnFirstNotificationConsumer notificationConsumer) { - return parallelFlowable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return parallelFlowable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } private static Flowable endWhenPublisherComplete( Publisher publisher, EndOnFirstNotificationConsumer notificationConsumer) { return Flowable.fromPublisher(publisher) .doOnComplete(notificationConsumer) - .doOnError(notificationConsumer); + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } /** @@ -111,9 +121,11 @@ public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) { @Override public void run() { - if (compareAndSet(false, true)) { - tracer.end(context); - } + accept(null); + } + + public void onCancelOrDispose() { + accept(new CancellationException()); } @Override diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy index b2d2ea711975..8a8d990afa07 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy @@ -26,6 +26,8 @@ import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import spock.lang.Specification +import java.util.concurrent.CancellationException + class RxJava2AsyncSpanEndStrategyTest extends Specification { BaseTracer tracer @@ -112,6 +114,25 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + + when: + def result = (Completable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = CompletableSubject.create() @@ -246,6 +267,25 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = MaybeSubject.create() @@ -350,6 +390,25 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when errored"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + + when: + def result = (Single) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = SingleSubject.create() @@ -454,6 +513,25 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + + when: + def result = (Observable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = ReplaySubject.create() @@ -555,6 +633,25 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = ReplayProcessor.create() @@ -655,6 +752,25 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) 1 * tracer.endExceptionally(context, exception) } + + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } } static class PublisherTest extends RxJava2AsyncSpanEndStrategyTest { @@ -703,6 +819,25 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) observer.assertError(exception) } + + def "ends span when cancelled"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } } static class CustomPublisher implements Publisher, Subscription { diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java index c2aecbc191b3..04b91ebe6fae 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java @@ -17,6 +17,7 @@ import io.reactivex.rxjava3.functions.BiConsumer; import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.parallel.ParallelFlowable; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; @@ -55,7 +56,9 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { private static Completable endWhenComplete( Completable completable, EndOnFirstNotificationConsumer notificationConsumer) { - return completable.doOnEvent(notificationConsumer); + return completable + .doOnEvent(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static Maybe endWhenMaybeComplete( @@ -63,7 +66,7 @@ private static Maybe endWhenMaybeComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return maybe.doOnEvent(typedConsumer); + return maybe.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Single endWhenSingleComplete( @@ -71,25 +74,32 @@ private static Single endWhenSingleComplete( @SuppressWarnings("unchecked") EndOnFirstNotificationConsumer typedConsumer = (EndOnFirstNotificationConsumer) notificationConsumer; - return single.doOnEvent(typedConsumer); + return single.doOnEvent(typedConsumer).doOnDispose(notificationConsumer::onCancelOrDispose); } private static Observable endWhenObservableComplete( Observable observable, EndOnFirstNotificationConsumer notificationConsumer) { - return observable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return observable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnDispose(notificationConsumer::onCancelOrDispose); } private static ParallelFlowable endWhenFirstComplete( ParallelFlowable parallelFlowable, EndOnFirstNotificationConsumer notificationConsumer) { - return parallelFlowable.doOnComplete(notificationConsumer).doOnError(notificationConsumer); + return parallelFlowable + .doOnComplete(notificationConsumer) + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } private static Flowable endWhenPublisherComplete( Publisher publisher, EndOnFirstNotificationConsumer notificationConsumer) { return Flowable.fromPublisher(publisher) .doOnComplete(notificationConsumer) - .doOnError(notificationConsumer); + .doOnError(notificationConsumer) + .doOnCancel(notificationConsumer::onCancelOrDispose); } /** @@ -109,11 +119,13 @@ public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) { this.context = context; } + public void onCancelOrDispose() { + accept(new CancellationException()); + } + @Override public void run() { - if (compareAndSet(false, true)) { - tracer.end(context); - } + accept(null); } @Override diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy index 3df4f6cb8a10..bf0e287368ff 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy @@ -26,6 +26,8 @@ import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import spock.lang.Specification +import java.util.concurrent.CancellationException + class RxJava3AsyncSpanEndStrategyTest extends Specification { BaseTracer tracer @@ -112,6 +114,25 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + + when: + def result = (Completable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.dispose() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = CompletableSubject.create() @@ -246,6 +267,25 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + + when: + def result = (Maybe) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.dispose() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = MaybeSubject.create() @@ -350,6 +390,25 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + + when: + def result = (Single) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.dispose() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = SingleSubject.create() @@ -454,6 +513,25 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + + when: + def result = (Observable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.dispose() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = ReplaySubject.create() @@ -555,6 +633,25 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } + def "ends span once for multiple subscribers"() { given: def source = ReplayProcessor.create() @@ -655,6 +752,25 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) 1 * tracer.endExceptionally(context, exception) } + + def "ends span when cancelled"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + + when: + def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } } static class PublisherTest extends RxJava3AsyncSpanEndStrategyTest { @@ -703,6 +819,25 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { 1 * tracer.endExceptionally(context, exception) observer.assertError(exception) } + + def "ends span when cancelled"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + + when: + def result = (Flowable) underTest.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + + when: + observer.cancel() + + then: + 1 * tracer.endExceptionally(context, _ as CancellationException) + } } static class CustomPublisher implements Publisher, Subscription { From 78d837881b50984164f47b6cbd2714f8badf2ee9 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Mon, 31 May 2021 14:10:04 -0400 Subject: [PATCH 02/12] Fix test name --- .../src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy index 8a8d990afa07..42da06f7aa6f 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy @@ -390,7 +390,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.assertError(exception) } - def "ends span when errored"() { + def "ends span when cancelled"() { given: def source = SingleSubject.create() def observer = new TestObserver() From 25b2a20504fd2ee632257aa31269e0e9c35538fa Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 10:36:38 -0400 Subject: [PATCH 03/12] Add semantic attribute on cancelation of reactive publisher --- .../reactor/ReactorAsyncSpanEndStrategy.java | 31 ++- .../ReactorAsyncSpanEndStrategyBuilder.java | 22 ++ .../reactor/TracingOperator.java | 12 +- .../ReactorAsyncSpanEndStrategyTest.groovy | 63 +++++- .../rxjava2/RxJava2AsyncSpanEndStrategy.java | 31 ++- .../RxJava2AsyncSpanEndStrategyBuilder.java | 23 ++ .../rxjava2/TracingAssembly.java | 12 +- .../RxJava2AsyncSpanEndStrategyTest.groovy | 196 +++++++++++++++++- .../rxjava3/RxJava3AsyncSpanEndStrategy.java | 31 ++- .../RxJava3AsyncSpanEndStrategyBuilder.java | 23 ++ .../rxjava3/TracingAssembly.java | 12 +- .../RxJava3AsyncSpanEndStrategyTest.groovy | 196 +++++++++++++++++- 12 files changed, 608 insertions(+), 44 deletions(-) create mode 100644 instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyBuilder.java create mode 100644 instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategyBuilder.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategyBuilder.java diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java index 9f02e99fd9a2..cd3d1e1e771e 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.java @@ -5,18 +5,34 @@ package io.opentelemetry.instrumentation.reactor; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; -import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy { - INSTANCE; +public final class ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy { + private static final AttributeKey CANCELED_ATTRIBUTE_KEY = + AttributeKey.booleanKey("reactor.canceled"); + + public static ReactorAsyncSpanEndStrategy create() { + return newBuilder().build(); + } + + public static ReactorAsyncSpanEndStrategyBuilder newBuilder() { + return new ReactorAsyncSpanEndStrategyBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + ReactorAsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } @Override public boolean supports(Class returnType) { @@ -46,7 +62,7 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) { * OnError notifications are received. Multiple notifications can happen anytime multiple * subscribers subscribe to the same publisher. */ - private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + private final class EndOnFirstNotificationConsumer extends AtomicBoolean implements Runnable, Consumer { private final BaseTracer tracer; @@ -63,7 +79,12 @@ public void onSuccess(T ignored) { } public void onCancel() { - accept(new CancellationException()); + if (compareAndSet(false, true)) { + if (captureExperimentalSpanAttributes) { + Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); + } + tracer.end(context); + } } @Override diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyBuilder.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyBuilder.java new file mode 100644 index 000000000000..609e5b6487b3 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +public final class ReactorAsyncSpanEndStrategyBuilder { + private boolean captureExperimentalSpanAttributes; + + ReactorAsyncSpanEndStrategyBuilder() {} + + public ReactorAsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public ReactorAsyncSpanEndStrategy build() { + return new ReactorAsyncSpanEndStrategy(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java index 5c56652e9876..76e4d2ed44d4 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java @@ -23,6 +23,7 @@ package io.opentelemetry.instrumentation.reactor; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; import java.util.function.BiFunction; import java.util.function.Function; @@ -44,13 +45,20 @@ public final class TracingOperator { */ public static void registerOnEachOperator() { Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift()); - AsyncSpanEndStrategies.getInstance().registerStrategy(ReactorAsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance() + .registerStrategy( + ReactorAsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.reactor.experimental-span-attributes", false)) + .build()); } /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ public static void resetOnEachOperator() { Hooks.resetOnEachOperator(TracingSubscriber.class.getName()); - AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.class); } private static Function, ? extends Publisher> tracingLift() { diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy index 1c486c018ff4..5963e5597f8a 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.reactor +import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.api.tracer.BaseTracer import reactor.core.publisher.Flux @@ -20,11 +21,19 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { Context context - def underTest = ReactorAsyncSpanEndStrategy.INSTANCE + Span span + + def underTest = ReactorAsyncSpanEndStrategy.create() + + def underTestWithExperimentalAttributes = ReactorAsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(true) + .build() void setup() { tracer = Mock() context = Mock() + span = Mock() + span.storeInContext(_) >> { callRealMethod() } } static class MonoTest extends ReactorAsyncSpanEndStrategyTest { @@ -137,6 +146,7 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { given: def source = UnicastProcessor.create() def mono = source.singleOrEmpty() + def context = span.storeInContext(Context.root()) when: def result = (Mono) underTest.end(tracer, context, mono) @@ -150,7 +160,30 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { verifier.thenCancel().verify() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + def context = span.storeInContext(Context.root()) + + when: + def result = (Mono) underTestWithExperimentalAttributes.end(tracer, context, mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel().verify() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "reactor.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -278,6 +311,7 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { def "ends span when cancelled"() { given: def source = UnicastProcessor.create() + def context = span.storeInContext(Context.root()) when: def result = (Flux) underTest.end(tracer, context, source) @@ -292,7 +326,30 @@ class ReactorAsyncSpanEndStrategyTest extends Specification { .verify() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flux) underTestWithExperimentalAttributes.end(tracer, context, source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + then: + 0 * tracer._ + + when: + verifier.thenCancel() + .verify() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "reactor.canceled" }, true) } def "ends span once for multiple subscribers"() { diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java index 37c1e2b708a6..47083d1d37ab 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.rxjava2; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; @@ -17,12 +19,26 @@ import io.reactivex.functions.BiConsumer; import io.reactivex.functions.Consumer; import io.reactivex.parallel.ParallelFlowable; -import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; -public enum RxJava2AsyncSpanEndStrategy implements AsyncSpanEndStrategy { - INSTANCE; +public final class RxJava2AsyncSpanEndStrategy implements AsyncSpanEndStrategy { + private static final AttributeKey CANCELED_ATTRIBUTE_KEY = + AttributeKey.booleanKey("reactor.canceled"); + + public static RxJava2AsyncSpanEndStrategy create() { + return newBuilder().build(); + } + + public static RxJava2AsyncSpanEndStrategyBuilder newBuilder() { + return new RxJava2AsyncSpanEndStrategyBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + RxJava2AsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } @Override public boolean supports(Class returnType) { @@ -107,7 +123,7 @@ private static Flowable endWhenPublisherComplete( * OnError notifications are received. Multiple notifications can happen anytime multiple * subscribers subscribe to the same publisher. */ - private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + private final class EndOnFirstNotificationConsumer extends AtomicBoolean implements Action, Consumer, BiConsumer { private final BaseTracer tracer; @@ -125,7 +141,12 @@ public void run() { } public void onCancelOrDispose() { - accept(new CancellationException()); + if (compareAndSet(false, true)) { + if (captureExperimentalSpanAttributes) { + Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); + } + tracer.end(context); + } } @Override diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategyBuilder.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategyBuilder.java new file mode 100644 index 000000000000..b049e6dc5ca5 --- /dev/null +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategyBuilder.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava2; + +public final class RxJava2AsyncSpanEndStrategyBuilder { + + private boolean captureExperimentalSpanAttributes; + + RxJava2AsyncSpanEndStrategyBuilder() {} + + public RxJava2AsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public RxJava2AsyncSpanEndStrategy build() { + return new RxJava2AsyncSpanEndStrategy(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java index c7d40c5c7622..4d43cce6cdba 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java @@ -24,6 +24,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; import io.reactivex.Completable; import io.reactivex.CompletableObserver; @@ -230,7 +231,14 @@ private static void enableMaybe() { } private static void enableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().registerStrategy(RxJava2AsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance() + .registerStrategy( + RxJava2AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .build()); } private static void disableParallel() { @@ -266,7 +274,7 @@ private static void disableMaybe() { } private static void disableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava2AsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava2AsyncSpanEndStrategy.class); } private static Function compose( diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy index 42da06f7aa6f..711d75805192 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2AsyncSpanEndStrategyTest.groovy @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.api.tracer.BaseTracer import io.opentelemetry.instrumentation.rxjava2.RxJava2AsyncSpanEndStrategy @@ -26,18 +27,24 @@ import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import spock.lang.Specification -import java.util.concurrent.CancellationException - class RxJava2AsyncSpanEndStrategyTest extends Specification { BaseTracer tracer Context context - def underTest = RxJava2AsyncSpanEndStrategy.INSTANCE + Span span + + def underTest = RxJava2AsyncSpanEndStrategy.create() + + def underTestWithExperimentalAttributes = RxJava2AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(true) + .build() void setup() { tracer = Mock() context = Mock() + span = Mock() + span.storeInContext(_) >> { callRealMethod() } } static class CompletableTest extends RxJava2AsyncSpanEndStrategyTest { @@ -118,6 +125,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { given: def source = CompletableSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Completable) underTest.end(tracer, context, source) @@ -125,12 +133,36 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { then: 0 * tracer._ + 0 * span._ when: observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Completable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -271,6 +303,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { given: def source = MaybeSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Maybe) underTest.end(tracer, context, source) @@ -283,7 +316,30 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Maybe) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -394,6 +450,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { given: def source = SingleSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Single) underTest.end(tracer, context, source) @@ -406,7 +463,30 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Single) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -517,6 +597,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { given: def source = UnicastSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Observable) underTest.end(tracer, context, source) @@ -529,7 +610,30 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Observable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -637,6 +741,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { given: def source = UnicastProcessor.create() def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) when: def result = (Flowable) underTest.end(tracer, context, source) @@ -649,7 +754,30 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -757,6 +885,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { given: def source = UnicastProcessor.create() def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) when: def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) @@ -769,7 +898,30 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (ParallelFlowable) underTestWithExperimentalAttributes.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } } @@ -824,6 +976,7 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { given: def source = new CustomPublisher() def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) when: def result = (Flowable) underTest.end(tracer, context, source) @@ -836,7 +989,30 @@ class RxJava2AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java index 04b91ebe6fae..1b901dc547a1 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategy.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.rxjava3; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy; @@ -17,12 +19,26 @@ import io.reactivex.rxjava3.functions.BiConsumer; import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.parallel.ParallelFlowable; -import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; -public enum RxJava3AsyncSpanEndStrategy implements AsyncSpanEndStrategy { - INSTANCE; +public final class RxJava3AsyncSpanEndStrategy implements AsyncSpanEndStrategy { + private static final AttributeKey CANCELED_ATTRIBUTE_KEY = + AttributeKey.booleanKey("rxjava.canceled"); + + public static RxJava3AsyncSpanEndStrategy create() { + return newBuilder().build(); + } + + public static RxJava3AsyncSpanEndStrategyBuilder newBuilder() { + return new RxJava3AsyncSpanEndStrategyBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + RxJava3AsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } @Override public boolean supports(Class returnType) { @@ -107,7 +123,7 @@ private static Flowable endWhenPublisherComplete( * OnError notifications are received. Multiple notifications can happen anytime multiple * subscribers subscribe to the same publisher. */ - private static final class EndOnFirstNotificationConsumer extends AtomicBoolean + private final class EndOnFirstNotificationConsumer extends AtomicBoolean implements Action, Consumer, BiConsumer { private final BaseTracer tracer; @@ -120,7 +136,12 @@ public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) { } public void onCancelOrDispose() { - accept(new CancellationException()); + if (compareAndSet(false, true)) { + if (captureExperimentalSpanAttributes) { + Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true); + } + tracer.end(context); + } } @Override diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategyBuilder.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategyBuilder.java new file mode 100644 index 000000000000..04ac6cf2a21b --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncSpanEndStrategyBuilder.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +public final class RxJava3AsyncSpanEndStrategyBuilder { + + private boolean captureExperimentalSpanAttributes; + + RxJava3AsyncSpanEndStrategyBuilder() {} + + public RxJava3AsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public RxJava3AsyncSpanEndStrategy build() { + return new RxJava3AsyncSpanEndStrategy(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java index eb400a629784..b92b14658162 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java @@ -24,6 +24,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.CompletableObserver; @@ -228,7 +229,14 @@ private static void enableMaybe() { } private static void enableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().registerStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance() + .registerStrategy( + RxJava3AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .build()); } private static void disableParallel() { @@ -264,7 +272,7 @@ private static void disableMaybe() { } private static void disableWithSpanStrategy() { - AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava3AsyncSpanEndStrategy.INSTANCE); + AsyncSpanEndStrategies.getInstance().unregisterStrategy(RxJava3AsyncSpanEndStrategy.class); } private static Function compose( diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy index bf0e287368ff..0e8956a3da06 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncSpanEndStrategyTest.groovy @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import io.opentelemetry.api.trace.Span import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.api.tracer.BaseTracer import io.opentelemetry.instrumentation.rxjava3.RxJava3AsyncSpanEndStrategy @@ -26,18 +27,24 @@ import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import spock.lang.Specification -import java.util.concurrent.CancellationException - class RxJava3AsyncSpanEndStrategyTest extends Specification { BaseTracer tracer Context context - def underTest = RxJava3AsyncSpanEndStrategy.INSTANCE + Span span + + def underTest = RxJava3AsyncSpanEndStrategy.create() + + def underTestWithExperimentalAttributes = RxJava3AsyncSpanEndStrategy.newBuilder() + .setCaptureExperimentalSpanAttributes(true) + .build() void setup() { tracer = Mock() context = Mock() + span = Mock() + span.storeInContext(_) >> { callRealMethod() } } static class CompletableTest extends RxJava3AsyncSpanEndStrategyTest { @@ -118,6 +125,7 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { given: def source = CompletableSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Completable) underTest.end(tracer, context, source) @@ -125,12 +133,36 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { then: 0 * tracer._ + 0 * span._ when: observer.dispose() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = CompletableSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Completable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -271,6 +303,7 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { given: def source = MaybeSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Maybe) underTest.end(tracer, context, source) @@ -283,7 +316,30 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.dispose() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = MaybeSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Maybe) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -394,6 +450,7 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { given: def source = SingleSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Single) underTest.end(tracer, context, source) @@ -406,7 +463,30 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.dispose() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = SingleSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Single) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -517,6 +597,7 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { given: def source = UnicastSubject.create() def observer = new TestObserver() + def context = span.storeInContext(Context.root()) when: def result = (Observable) underTest.end(tracer, context, source) @@ -529,7 +610,30 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.dispose() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastSubject.create() + def observer = new TestObserver() + def context = span.storeInContext(Context.root()) + + when: + def result = (Observable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.dispose() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -637,6 +741,7 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { given: def source = UnicastProcessor.create() def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) when: def result = (Flowable) underTest.end(tracer, context, source) @@ -649,7 +754,30 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } def "ends span once for multiple subscribers"() { @@ -757,6 +885,7 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { given: def source = UnicastProcessor.create() def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) when: def result = (ParallelFlowable) underTest.end(tracer, context, source.parallel()) @@ -769,7 +898,30 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = UnicastProcessor.create() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (ParallelFlowable) underTestWithExperimentalAttributes.end(tracer, context, source.parallel()) + result.sequential().subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } } @@ -824,6 +976,7 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { given: def source = new CustomPublisher() def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) when: def result = (Flowable) underTest.end(tracer, context, source) @@ -836,7 +989,30 @@ class RxJava3AsyncSpanEndStrategyTest extends Specification { observer.cancel() then: - 1 * tracer.endExceptionally(context, _ as CancellationException) + 1 * tracer.end(context) + 0 * span.setAttribute(_) + } + + def "ends span when cancelled and capturing experimental span attributes"() { + given: + def source = new CustomPublisher() + def observer = new TestSubscriber() + def context = span.storeInContext(Context.root()) + + when: + def result = (Flowable) underTestWithExperimentalAttributes.end(tracer, context, source) + result.subscribe(observer) + + then: + 0 * tracer._ + 0 * span._ + + when: + observer.cancel() + + then: + 1 * tracer.end(context) + 1 * span.setAttribute({ it.getKey() == "rxjava.canceled" }, true) } } From b5a332a60e60e43d9732b3682085c8d4f18470dc Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 11:24:07 -0400 Subject: [PATCH 04/12] Fix span attribute name --- .../instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java index 47083d1d37ab..38701b5402e1 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2AsyncSpanEndStrategy.java @@ -24,7 +24,7 @@ public final class RxJava2AsyncSpanEndStrategy implements AsyncSpanEndStrategy { private static final AttributeKey CANCELED_ATTRIBUTE_KEY = - AttributeKey.booleanKey("reactor.canceled"); + AttributeKey.booleanKey("rxjava.canceled"); public static RxJava2AsyncSpanEndStrategy create() { return newBuilder().build(); From 8bbba3bf1f920317f3730f84e7205e58da01bc96 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 11:48:40 -0400 Subject: [PATCH 05/12] Change TracingOperator and TracingAssembly to accept configuration from Javaagent --- .../reactor/HooksInstrumentation.java | 9 ++- .../reactor/TracingOperator.java | 24 ++++-- .../reactor/TracingOperatorBuilder.java | 22 ++++++ .../instrumentation/reactor/HooksTest.groovy | 5 +- .../reactor/SubscriptionTest.groovy | 8 +- .../rxjava2/TracingAssemblyActivation.java | 9 ++- .../rxjava2/TracingAssembly.java | 74 +++++++++++-------- .../rxjava2/TracingAssemblyBuilder.java | 22 ++++++ .../groovy/RxJava2SubscriptionTest.groovy | 5 +- .../src/test/groovy/RxJava2Test.groovy | 5 +- .../rxjava3/TracingAssemblyActivation.java | 9 ++- .../rxjava3/TracingAssembly.java | 74 +++++++++++-------- .../rxjava3/TracingAssemblyBuilder.java | 22 ++++++ .../groovy/RxJava3SubscriptionTest.groovy | 5 +- .../src/test/groovy/RxJava3Test.groovy | 5 +- 15 files changed, 218 insertions(+), 80 deletions(-) create mode 100644 instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java create mode 100644 instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyBuilder.java create mode 100644 instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyBuilder.java diff --git a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java index 425f6b355279..73d62f973576 100644 --- a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java +++ b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java @@ -8,6 +8,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer; import static net.bytebuddy.matcher.ElementMatchers.named; +import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.reactor.TracingOperator; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -31,7 +32,13 @@ public void transform(TypeTransformer transformer) { public static class ResetOnEachOperatorAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void postStaticInitializer() { - TracingOperator.registerOnEachOperator(); + TracingOperator.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.reactor.experimental-span-attributes", false)) + .build() + .registerOnEachOperator(); } } } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java index 76e4d2ed44d4..927dcfed1b58 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java @@ -23,7 +23,6 @@ package io.opentelemetry.instrumentation.reactor; import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; import java.util.function.BiFunction; import java.util.function.Function; @@ -37,26 +36,37 @@ /** Based on Spring Sleuth's Reactor instrumentation. */ public final class TracingOperator { + public static TracingOperator create() { + return newBuilder().build(); + } + + public static TracingOperatorBuilder newBuilder() { + return new TracingOperatorBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + TracingOperator(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } + /** * Registers a hook that applies to every operator, propagating {@link Context} to downstream * callbacks to ensure spans in the {@link Context} are available throughout the lifetime of a * reactive stream. This should generally be called in a static initializer block in your * application. */ - public static void registerOnEachOperator() { + public void registerOnEachOperator() { Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift()); AsyncSpanEndStrategies.getInstance() .registerStrategy( ReactorAsyncSpanEndStrategy.newBuilder() - .setCaptureExperimentalSpanAttributes( - Config.get() - .getBooleanProperty( - "otel.instrumentation.reactor.experimental-span-attributes", false)) + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) .build()); } /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ - public static void resetOnEachOperator() { + public void resetOnEachOperator() { Hooks.resetOnEachOperator(TracingSubscriber.class.getName()); AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.class); } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java new file mode 100644 index 000000000000..99889cfa3cc2 --- /dev/null +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.reactor; + +public final class TracingOperatorBuilder { + private boolean captureExperimentalSpanAttributes; + + TracingOperatorBuilder() {} + + public TracingOperatorBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public TracingOperator build() { + return new TracingOperator(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy index 3cf2af8a5fbe..7760696bec54 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy @@ -14,6 +14,7 @@ class HooksTest extends LibraryInstrumentationSpecification { def "can reset out hooks"() { setup: + def underTest = TracingOperator.create() AtomicReference subscriber = new AtomicReference<>() when: "no hook registered" @@ -23,14 +24,14 @@ class HooksTest extends LibraryInstrumentationSpecification { !(subscriber.get() instanceof TracingSubscriber) when: "hook registered" - TracingOperator.registerOnEachOperator() + underTest.registerOnEachOperator() new CapturingMono(subscriber).map { it + 1 }.subscribe() then: subscriber.get() instanceof TracingSubscriber when: "hook reset" - TracingOperator.resetOnEachOperator() + underTest.resetOnEachOperator() new CapturingMono(subscriber).map { it + 1 }.subscribe() then: diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy index 41ad20821707..9bfc899474a6 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy @@ -6,13 +6,17 @@ package io.opentelemetry.instrumentation.reactor import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class SubscriptionTest extends AbstractSubscriptionTest implements LibraryTestTrait { + @Shared + TracingOperator tracingOperator = TracingOperator.create() + def setupSpec() { - TracingOperator.registerOnEachOperator() + tracingOperator.registerOnEachOperator() } def cleanupSpec() { - TracingOperator.resetOnEachOperator() + tracingOperator.resetOnEachOperator() } } diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java index c0fbbe2359a9..b0735d346abc 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.rxjava2; +import io.opentelemetry.instrumentation.api.config.Config; import java.util.concurrent.atomic.AtomicBoolean; public final class TracingAssemblyActivation { @@ -19,7 +20,13 @@ protected AtomicBoolean computeValue(Class type) { public static void activate(Class clz) { if (activated.get(clz).compareAndSet(false, true)) { - TracingAssembly.enable(); + TracingAssembly.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .build() + .enable(); } } diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java index 4d43cce6cdba..d9323bfec6ef 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java @@ -24,7 +24,6 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; import io.reactivex.Completable; import io.reactivex.CompletableObserver; @@ -97,50 +96,66 @@ public final class TracingAssembly { @GuardedBy("TracingAssembly.class") private static boolean enabled; - private TracingAssembly() {} + public static TracingAssembly create() { + return newBuilder().build(); + } - public static synchronized void enable() { - if (enabled) { - return; - } + public static TracingAssemblyBuilder newBuilder() { + return new TracingAssemblyBuilder(); + } - enableObservable(); + private final boolean captureExperimentalSpanAttributes; - enableCompletable(); + TracingAssembly(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } - enableSingle(); + public void enable() { + synchronized (TracingAssembly.class) { + if (enabled) { + return; + } - enableMaybe(); + enableObservable(); - enableFlowable(); + enableCompletable(); - enableParallel(); + enableSingle(); - enableWithSpanStrategy(); + enableMaybe(); - enabled = true; - } + enableFlowable(); - public static synchronized void disable() { - if (!enabled) { - return; + enableParallel(); + + enableWithSpanStrategy(captureExperimentalSpanAttributes); + + enabled = true; } + } - disableObservable(); + public void disable() { + synchronized (TracingAssembly.class) { + if (!enabled) { + return; + } - disableCompletable(); + disableObservable(); - disableSingle(); + disableCompletable(); - disableMaybe(); + disableSingle(); - disableFlowable(); + disableMaybe(); - disableParallel(); + disableFlowable(); - disableWithSpanStrategy(); + disableParallel(); - enabled = false; + disableWithSpanStrategy(); + + enabled = false; + } } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -230,14 +245,11 @@ private static void enableMaybe() { })); } - private static void enableWithSpanStrategy() { + private static void enableWithSpanStrategy(boolean captureExperimentalSpanAttributes) { AsyncSpanEndStrategies.getInstance() .registerStrategy( RxJava2AsyncSpanEndStrategy.newBuilder() - .setCaptureExperimentalSpanAttributes( - Config.get() - .getBooleanProperty( - "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) .build()); } diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyBuilder.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyBuilder.java new file mode 100644 index 000000000000..050d039ff29e --- /dev/null +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava2; + +public final class TracingAssemblyBuilder { + private boolean captureExperimentalSpanAttributes; + + TracingAssemblyBuilder() {} + + public TracingAssemblyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public TracingAssembly build() { + return new TracingAssembly(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy index 15d2f624c8ae..935e49de1e1e 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2SubscriptionTest.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava2.AbstractRxJava2SubscriptionTest import io.opentelemetry.instrumentation.rxjava2.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava2SubscriptionTest extends AbstractRxJava2SubscriptionTest implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } } diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy index d379cbce7885..8b9ca49ea48c 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy +++ b/instrumentation/rxjava/rxjava-2.0/library/src/test/groovy/RxJava2Test.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava2.AbstractRxJava2Test import io.opentelemetry.instrumentation.rxjava2.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava2Test extends AbstractRxJava2Test implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } } diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java index 92c4937e87d1..8b276353d279 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.rxjava3; +import io.opentelemetry.instrumentation.api.config.Config; import java.util.concurrent.atomic.AtomicBoolean; public final class TracingAssemblyActivation { @@ -19,7 +20,13 @@ protected AtomicBoolean computeValue(Class type) { public static void activate(Class clz) { if (activated.get(clz).compareAndSet(false, true)) { - TracingAssembly.enable(); + TracingAssembly.newBuilder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .build() + .enable(); } } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java index b92b14658162..796ec4824914 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssembly.java @@ -24,7 +24,6 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.CompletableObserver; @@ -97,50 +96,66 @@ public final class TracingAssembly { @GuardedBy("TracingAssembly.class") private static boolean enabled; - private TracingAssembly() {} + public static TracingAssembly create() { + return newBuilder().build(); + } - public static synchronized void enable() { - if (enabled) { - return; - } + public static TracingAssemblyBuilder newBuilder() { + return new TracingAssemblyBuilder(); + } - enableObservable(); + private final boolean captureExperimentalSpanAttributes; - enableCompletable(); + TracingAssembly(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } - enableSingle(); + public void enable() { + synchronized (TracingAssembly.class) { + if (enabled) { + return; + } - enableMaybe(); + enableObservable(); - enableFlowable(); + enableCompletable(); - enableParallel(); + enableSingle(); - enableWithSpanStrategy(); + enableMaybe(); - enabled = true; - } + enableFlowable(); - public static synchronized void disable() { - if (!enabled) { - return; + enableParallel(); + + enableWithSpanStrategy(captureExperimentalSpanAttributes); + + enabled = true; } + } - disableObservable(); + public void disable() { + synchronized (TracingAssembly.class) { + if (!enabled) { + return; + } - disableCompletable(); + disableObservable(); - disableSingle(); + disableCompletable(); - disableMaybe(); + disableSingle(); - disableFlowable(); + disableMaybe(); - disableParallel(); + disableFlowable(); - disableWithSpanStrategy(); + disableParallel(); - enabled = false; + disableWithSpanStrategy(); + + enabled = false; + } } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -228,14 +243,11 @@ private static void enableMaybe() { })); } - private static void enableWithSpanStrategy() { + private static void enableWithSpanStrategy(boolean captureExperimentalSpanAttributes) { AsyncSpanEndStrategies.getInstance() .registerStrategy( RxJava3AsyncSpanEndStrategy.newBuilder() - .setCaptureExperimentalSpanAttributes( - Config.get() - .getBooleanProperty( - "otel.instrumentation.rxjava.experimental-span-attributes", false)) + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) .build()); } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyBuilder.java b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyBuilder.java new file mode 100644 index 000000000000..c5321a7b5f39 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3; + +public final class TracingAssemblyBuilder { + private boolean captureExperimentalSpanAttributes; + + TracingAssemblyBuilder() {} + + public TracingAssemblyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public TracingAssembly build() { + return new TracingAssembly(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy index 0351dae24a8c..800f5adfe1ca 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3SubscriptionTest.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest import io.opentelemetry.instrumentation.rxjava3.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy index 4bcf431d69a4..24215f993fab 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy +++ b/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3Test.groovy @@ -6,10 +6,13 @@ import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test import io.opentelemetry.instrumentation.rxjava3.TracingAssembly import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class RxJava3Test extends AbstractRxJava3Test implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() def setupSpec() { - TracingAssembly.enable() + tracingAssembly.enable() } } From 7a94c9bd0b89875654f18e5a58852909f87d755a Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 11:52:25 -0400 Subject: [PATCH 06/12] Remove unused import --- .../reactor/ReactorAsyncSpanEndStrategyTest.groovy | 2 -- 1 file changed, 2 deletions(-) diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy index 5963e5597f8a..b6164fb007df 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategyTest.groovy @@ -14,8 +14,6 @@ import reactor.core.publisher.UnicastProcessor import reactor.test.StepVerifier import spock.lang.Specification -import java.util.concurrent.CancellationException - class ReactorAsyncSpanEndStrategyTest extends Specification { BaseTracer tracer From 2db515a431ca27cbc4078349fb0c2ba459a3243b Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 12:09:24 -0400 Subject: [PATCH 07/12] Remove private constructor --- .../opentelemetry/instrumentation/reactor/TracingOperator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java index 927dcfed1b58..532db89b7b93 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java @@ -87,6 +87,4 @@ public CoreSubscriber apply(Scannable publisher, CoreSubscriber(sub, sub.currentContext()); } } - - private TracingOperator() {} } From 8790578199a4df1a7d4ffeb2ef78bb3a7f13c4b7 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 12:33:33 -0400 Subject: [PATCH 08/12] Fix test --- .../instrumentation/reactor/ReactorCoreTest.groovy | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy index 1fb3f8243736..9d7e65acbb71 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy @@ -6,13 +6,17 @@ package io.opentelemetry.instrumentation.reactor import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrait { + @Shared + TracingOperator tracingOperator = TracingOperator.create() + def setupSpec() { - TracingOperator.registerOnEachOperator() + tracingOperator.registerOnEachOperator() } def cleanupSpec() { - TracingOperator.resetOnEachOperator() + tracingOperator.resetOnEachOperator() } } From 712bf7ff2a0e9e55884c7f713cb21b3c0174e4b7 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 12:54:28 -0400 Subject: [PATCH 09/12] Fix Lettuce tests --- .../lettuce/v5_1/LettuceReactiveClientTest.groovy | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy b/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy index 2cfb789cccc1..af26609719d7 100644 --- a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy +++ b/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy @@ -9,8 +9,12 @@ import io.lettuce.core.RedisClient import io.lettuce.core.resource.ClientResources import io.opentelemetry.instrumentation.reactor.TracingOperator import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implements LibraryTestTrait { + @Shared + TracingOperator tracingOperator = TracingOperator.create() + @Override RedisClient createClient(String uri) { return RedisClient.create( @@ -21,10 +25,10 @@ class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implem } def setupSpec() { - TracingOperator.registerOnEachOperator() + tracingOperator.registerOnEachOperator() } def cleanupSpec() { - TracingOperator.resetOnEachOperator() + tracingOperator.resetOnEachOperator() } } From d3cfc4b6694a18e73df21e6b0f4a200fd18cab49 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Tue, 1 Jun 2021 22:13:46 -0400 Subject: [PATCH 10/12] Add javaagent tests --- .../javaagent/reactor-3.1-javaagent.gradle | 5 + .../ReactorWithSpanInstrumentationTest.groovy | 60 +++++ .../javaagent/rxjava-2.0-javaagent.gradle | 5 + .../RxJava2WithSpanInstrumentationTest.groovy | 222 ++++++++++++++++++ .../javaagent/rxjava-3.0-javaagent.gradle | 5 + .../RxJava3WithSpanInstrumentationTest.groovy | 215 +++++++++++++++++ 6 files changed, 512 insertions(+) diff --git a/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle b/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle index f4f55e428c3d..9aff3fc6d67a 100644 --- a/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle +++ b/instrumentation/reactor-3.1/javaagent/reactor-3.1-javaagent.gradle @@ -9,6 +9,11 @@ muzzle { } } +tasks.withType(Test).configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs "-Dotel.instrumentation.reactor.experimental-span-attributes=true" +} + dependencies { implementation project(':instrumentation:reactor-3.1:library') diff --git a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy index 7787ff7f8b09..47a6fd3e8c7f 100644 --- a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy +++ b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy @@ -130,6 +130,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Mono"() { + setup: + def source = UnicastProcessor.create() + def mono = source.singleOrEmpty() + def result = new TracedWithSpan() + .mono(mono) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + verifier.thenCancel().verify() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + "reactor.canceled" true + } + } + } + } + } + def "should capture span for already completed Flux"() { setup: def source = Flux.just("Value") @@ -242,4 +271,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } } + + def "should capture span for canceled Flux"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor.create() + def result = new TracedWithSpan() + .flux(source) + def verifier = StepVerifier.create(result) + .expectSubscription() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + + verifier.thenCancel().verify() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + "reactor.canceled" true + } + } + } + } + } } diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle b/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle index ec79e2452bf6..72d76dfbc13b 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle @@ -9,6 +9,11 @@ muzzle { } } +tasks.withType(Test).configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs "-Dotel.instrumentation.rxjava.experimental-span-attributes=true" +} + dependencies { library "io.reactivex.rxjava2:rxjava:2.0.6" diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy index 8e33aa20131f..0c089dbea6c5 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy @@ -136,6 +136,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Completable"() { + setup: + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Maybe"() { setup: def observer = new TestObserver() @@ -271,6 +300,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Maybe"() { + setup: + def source = MaybeSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Single"() { setup: def observer = new TestObserver() @@ -383,6 +441,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Single"() { + setup: + def source = SingleSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Observable"() { setup: def observer = new TestObserver() @@ -506,6 +593,41 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Observable"() { + setup: + def source = UnicastSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Flowable"() { setup: def observer = new TestSubscriber() @@ -629,6 +751,41 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Flowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed ParallelFlowable"() { setup: def observer = new TestSubscriber() @@ -756,6 +913,42 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for eventually errored ParallelFlowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for eventually completed Publisher"() { setup: def source = new CustomPublisher() @@ -817,6 +1010,35 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Publisher"() { + setup: + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + observer.assertSubscribed() + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + static class CustomPublisher implements Publisher, Subscription { Subscriber subscriber diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle b/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle index 3c681eb80c3c..f6027d3f54fb 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/rxjava-3.0-javaagent.gradle @@ -9,6 +9,11 @@ muzzle { } } +tasks.withType(Test).configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs "-Dotel.instrumentation.rxjava.experimental-span-attributes=true" +} + dependencies { library "io.reactivex.rxjava3:rxjava:3.0.0" diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy index c94bf8c8af60..dbf9baec4b70 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy @@ -134,6 +134,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Completable"() { + setup: + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Maybe"() { setup: def observer = new TestObserver() @@ -267,6 +295,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Maybe"() { + setup: + def source = MaybeSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Single"() { setup: def observer = new TestObserver() @@ -377,6 +433,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Single"() { + setup: + def source = SingleSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Observable"() { setup: def observer = new TestObserver() @@ -498,6 +582,40 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Observable"() { + setup: + def source = UnicastSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed Flowable"() { setup: def observer = new TestSubscriber() @@ -619,6 +737,40 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for eventually errored Flowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for already completed ParallelFlowable"() { setup: def observer = new TestSubscriber() @@ -744,6 +896,41 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled ParallelFlowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + def "should capture span for eventually completed Publisher"() { setup: def source = new CustomPublisher() @@ -803,6 +990,34 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture span for canceled Publisher"() { + setup: + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + static class CustomPublisher implements Publisher, Subscription { Subscriber subscriber From 3a2350bb1f3b0a25a58e170834c33d085f2eb29e Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Wed, 2 Jun 2021 09:12:59 -0400 Subject: [PATCH 11/12] fix copypasta fail --- .../src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy | 2 +- .../src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy index 0c089dbea6c5..3f24b3309af2 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/test/groovy/RxJava2WithSpanInstrumentationTest.groovy @@ -913,7 +913,7 @@ class RxJava2WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } - def "should capture span for eventually errored ParallelFlowable"() { + def "should capture span for canceled ParallelFlowable"() { setup: def source = UnicastProcessor. create() def observer = new TestSubscriber() diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy index dbf9baec4b70..c196cbb98f85 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy @@ -737,7 +737,7 @@ class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } - def "should capture span for eventually errored Flowable"() { + def "should capture span for canceled Flowable"() { setup: def source = UnicastProcessor. create() def observer = new TestSubscriber() From b4e7a6badd27949142831e135d8dd9baa0d6a620 Mon Sep 17 00:00:00 2001 From: "Spindler, Justin" Date: Wed, 2 Jun 2021 21:11:55 -0400 Subject: [PATCH 12/12] Refactor RxJava 2/3 javaagent classes to javaagent package --- .../instrumentation/rxjava2/RxJava2InstrumentationModule.java | 2 +- .../instrumentation/rxjava2/RxJavaPluginsInstrumentation.java | 2 +- .../instrumentation/rxjava2/TracingAssemblyActivation.java | 3 ++- .../instrumentation/rxjava3/RxJava3InstrumentationModule.java | 2 +- .../instrumentation/rxjava3/RxJavaPluginsInstrumentation.java | 2 +- .../instrumentation/rxjava3/TracingAssemblyActivation.java | 3 ++- 6 files changed, 8 insertions(+), 6 deletions(-) rename instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/{ => javaagent}/instrumentation/rxjava2/RxJava2InstrumentationModule.java (91%) rename instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/{ => javaagent}/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java (95%) rename instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/{ => javaagent}/instrumentation/rxjava2/TracingAssemblyActivation.java (88%) rename instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/{ => javaagent}/instrumentation/rxjava3/RxJava3InstrumentationModule.java (91%) rename instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/{ => javaagent}/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java (95%) rename instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/{ => javaagent}/instrumentation/rxjava3/TracingAssemblyActivation.java (88%) diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2InstrumentationModule.java b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJava2InstrumentationModule.java similarity index 91% rename from instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2InstrumentationModule.java rename to instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJava2InstrumentationModule.java index 71e5aae55fad..b36ce0e9f9e6 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJava2InstrumentationModule.java +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJava2InstrumentationModule.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava2; +package io.opentelemetry.javaagent.instrumentation.rxjava2; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java similarity index 95% rename from instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java rename to instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java index 56e393ceb196..50f2a9e78382 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/RxJavaPluginsInstrumentation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava2; +package io.opentelemetry.javaagent.instrumentation.rxjava2; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.named; diff --git a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/TracingAssemblyActivation.java similarity index 88% rename from instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java rename to instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/TracingAssemblyActivation.java index b0735d346abc..1246517241c2 100644 --- a/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssemblyActivation.java +++ b/instrumentation/rxjava/rxjava-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava2/TracingAssemblyActivation.java @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava2; +package io.opentelemetry.javaagent.instrumentation.rxjava2; import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.rxjava2.TracingAssembly; import java.util.concurrent.atomic.AtomicBoolean; public final class TracingAssemblyActivation { diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java similarity index 91% rename from instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java rename to instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java index 892e5eac0d6e..8f2848abd9d7 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3InstrumentationModule.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava3; +package io.opentelemetry.javaagent.instrumentation.rxjava3; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java similarity index 95% rename from instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java rename to instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java index 7d48b39cfd50..966b75f2a755 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJavaPluginsInstrumentation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava3; +package io.opentelemetry.javaagent.instrumentation.rxjava3; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.named; diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/TracingAssemblyActivation.java similarity index 88% rename from instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java rename to instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/TracingAssemblyActivation.java index 8b276353d279..52699d9a4975 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingAssemblyActivation.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/TracingAssemblyActivation.java @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.rxjava3; +package io.opentelemetry.javaagent.instrumentation.rxjava3; import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.rxjava3.TracingAssembly; import java.util.concurrent.atomic.AtomicBoolean; public final class TracingAssemblyActivation {