From 1d9029d45e1549a9c879ecb98a144d97b7929073 Mon Sep 17 00:00:00 2001 From: Hannah Arndt Date: Wed, 8 Jan 2025 17:18:54 +0100 Subject: [PATCH] Added helper methods for manual spans in mutiny pipelines. In a traditional blocking and synchronous framework the opentelemetry context is attached to ThreadLocal. In reactive programming, where multiple processings share the same event-loop thread, one has to use Vert.x duplicated contexts instead (see https://quarkus.io/guides/duplicated-context). wrapWithSpan ensures that the pipeline is executed on a duplicated context (If the current context already is duplicated, it will stay the same. Therefore, nested calls to wrapWithSpan will all run on the same vert.x context). inspired by Jan Peremsky (https://github.com/jan-peremsky/quarkus-reactive-otel/blob/c74043d388ec4df155f466f1d6938931c3389b70/src/main/java/com/fng/ewallet/pex/Tracer.java) and edeandrea (https://github.com/quarkusio/quarkus-super-heroes/blob/main/event-statistics/src/main/java/io/quarkus/sample/superheroes/statistics/listener/SuperStats.java) suggestions from review I will squash with the previous commit once everything is approved suggestion from reviewer todo: squash when everything is done -> adapt commit message. In this solution span and scope are local variables. no need for a stack. unnecessary line removed fix test to run on different contexts again cleanup --- .../main/asciidoc/opentelemetry-tracing.adoc | 54 +++ extensions/opentelemetry/deployment/pom.xml | 5 + .../traces/MutinyTracingHelperTest.java | 313 ++++++++++++++++++ .../tracing/mutiny/MutinyTracingHelper.java | 121 +++++++ 4 files changed, 493 insertions(+) create mode 100644 extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/traces/MutinyTracingHelperTest.java create mode 100644 extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/mutiny/MutinyTracingHelper.java diff --git a/docs/src/main/asciidoc/opentelemetry-tracing.adoc b/docs/src/main/asciidoc/opentelemetry-tracing.adoc index 4c5944576ee1d..a7e8505f8e405 100644 --- a/docs/src/main/asciidoc/opentelemetry-tracing.adoc +++ b/docs/src/main/asciidoc/opentelemetry-tracing.adoc @@ -526,6 +526,60 @@ public void tracedWork() { } ---- +=== Mutiny +Methods returning reactive types can also be annotated with `@WithSpan` and `@AddingSpanAttributes` to create a new span or add attributes to the current span. + +If you need to create spans manually within a mutiny pipeline, use `wrapWithSpan` method from `io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper`. + +Example. Assuming you have the following pipeline: +[source,java] +---- +Uni uni = Uni.createFrom().item("hello") + //start trace here + .onItem().transform(item -> item + " world") + .onItem().transform(item -> item + "!") + //end trace here + .subscribe().with( + item -> System.out.println("Received: " + item), + failure -> System.out.println("Failed with " + failure) + ); +---- +wrap it like this: +[source,java] +---- +import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan; +... +@Inject +Tracer tracer; +... +Context context = Context.current(); +Uni uni = Uni.createFrom().item("hello") + .transformToUni(m -> wrapWithSpan(tracer, Optional.of(context), "my-span-name", + Uni.createFrom().item(m) + .onItem().transform(item -> item + " world") + .onItem().transform(item -> item + "!") + )) + .subscribe().with( + item -> System.out.println("Received: " + item), + failure -> System.out.println("Failed with " + failure) + ); + +---- +for multi-pipelines it works similarly: +[source,java] +---- +Multi.createFrom().items("Alice", "Bob", "Charlie") + .transformToMultiAndConcatenate(m -> TracingHelper.withTrace("my-span-name", + Multi.createFrom().item(m) + .onItem().transform(name -> "Hello " + name) + )) + .subscribe().with( + item -> System.out.println("Received: " + item), + failure -> System.out.println("Failed with " + failure) + ); +---- +Instead of `transformToMultiAndConcatenate` you can use `transformToMultiAndMerge` if you don't care about the order of the items. + === Quarkus Messaging - Kafka When using the Quarkus Messaging extension for Kafka, diff --git a/extensions/opentelemetry/deployment/pom.xml b/extensions/opentelemetry/deployment/pom.xml index cac0090542ced..f0b9e0ef6d2a4 100644 --- a/extensions/opentelemetry/deployment/pom.xml +++ b/extensions/opentelemetry/deployment/pom.xml @@ -90,6 +90,11 @@ vertx-web-client test + + io.smallrye.reactive + smallrye-mutiny-vertx-junit5 + test + io.quarkus quarkus-reactive-routes-deployment diff --git a/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/traces/MutinyTracingHelperTest.java b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/traces/MutinyTracingHelperTest.java new file mode 100644 index 0000000000000..58313b1442200 --- /dev/null +++ b/extensions/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/traces/MutinyTracingHelperTest.java @@ -0,0 +1,313 @@ +package io.quarkus.opentelemetry.deployment.traces; + +import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +import jakarta.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporter; +import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporterProvider; +import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +class MutinyTracingHelperTest { + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addClasses(TestSpanExporter.class, TestSpanExporterProvider.class) + .addAsResource(new StringAsset(TestSpanExporterProvider.class.getCanonicalName()), + "META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider")); + + @Inject + private TestSpanExporter spanExporter; + + @Inject + private Tracer tracer; + + @Inject + private Vertx vertx; + + @AfterEach + void tearDown() { + spanExporter.reset(); + } + + @ParameterizedTest(name = "{index}: Simple uni pipeline {1}") + @MethodSource("generateContextRunners") + void testSimpleUniPipeline(final String contextType, final String contextName) { + + final UniAssertSubscriber subscriber = Uni.createFrom() + .item("Hello") + .emitOn(r -> runOnContext(r, vertx, contextType)) + .onItem() + .transformToUni(m -> wrapWithSpan(tracer, "testSpan", + Uni.createFrom().item(m).onItem().transform(s -> { + final Span span = tracer.spanBuilder("subspan").startSpan(); + try (final Scope scope = span.makeCurrent()) { + return s + " world"; + } finally { + span.end(); + } + }))) + .subscribe() + .withSubscriber(new UniAssertSubscriber<>()); + + subscriber.awaitItem().assertItem("Hello world"); + + //ensure there are two spans with subspan as child of testSpan + final List spans = spanExporter.getFinishedSpanItems(2); + assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder("testSpan", "subspan"); + assertChildSpan(spans, "testSpan", "subspan"); + } + + @ParameterizedTest(name = "{index}: Explicit parent {1}") + @MethodSource("generateContextRunners") + void testSpanWithExplicitParent(final String contextType, final String contextName) { + + final String parentSpanName = "parentSpan"; + final String pipelineSpanName = "pipelineSpan"; + final String subspanName = "subspan"; + + final Span parentSpan = tracer.spanBuilder(parentSpanName).startSpan(); + final io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.current().with(parentSpan); + + final UniAssertSubscriber subscriber = Uni.createFrom() + .item("Hello") + .emitOn(r -> runOnContext(r, vertx, contextType)) + .onItem() + .transformToUni(m -> wrapWithSpan(tracer, Optional.of(parentContext), + pipelineSpanName, + Uni.createFrom().item(m).onItem().transform(s -> { + final Span span = tracer.spanBuilder(subspanName).startSpan(); + try (final Scope scope = span.makeCurrent()) { + return s + " world"; + } finally { + span.end(); + } + }))) + .subscribe() + .withSubscriber(new UniAssertSubscriber<>()); + + subscriber.awaitItem().assertItem("Hello world"); + parentSpan.end(); + + //ensure there are 3 spans with proper parent-child relationships + final List spans = spanExporter.getFinishedSpanItems(3); + assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, pipelineSpanName, + subspanName); + assertChildSpan(spans, parentSpanName, pipelineSpanName); + assertChildSpan(spans, pipelineSpanName, subspanName); + } + + @ParameterizedTest(name = "{index}: Nested uni pipeline with implicit parent {1}") + @MethodSource("generateContextRunners") + void testNestedPipeline_implicitParent(final String contextType, + final String contextName) { + + final String parentSpanName = "parentSpan"; + final String childSpanName = "childSpan"; + + final UniAssertSubscriber subscriber = Uni.createFrom() + .item("test") + .emitOn(r -> runOnContext(r, vertx, contextType)) + .onItem() + .transformToUni(m -> wrapWithSpan(tracer, parentSpanName, + Uni.createFrom().item(m) + .onItem().transform(s -> s + " in outer span") + .onItem().transformToUni(m1 -> wrapWithSpan(tracer, childSpanName, + Uni.createFrom().item(m1) + .onItem().transform(s -> "now in inner span"))) + + )) + .subscribe() + .withSubscriber(new UniAssertSubscriber<>()); + + subscriber.awaitItem(); + + //ensure there are 2 spans with doSomething and doSomethingAsync as children of testSpan + final List spans = spanExporter.getFinishedSpanItems(2); + assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName); + assertChildSpan(spans, parentSpanName, childSpanName); + } + + @ParameterizedTest(name = "{index}: Nested uni pipeline with explicit no parent {1}") + @MethodSource("generateContextRunners") + void testNestedPipeline_explicitNoParent(final String contextType, final String contextName) { + + final String parentSpanName = "parentSpan"; + final String childSpanName = "childSpan"; + + final UniAssertSubscriber subscriber = Uni.createFrom() + .item("test") + .emitOn(r -> runOnContext(r, vertx, contextType)) + .onItem() + .transformToUni(m -> wrapWithSpan(tracer, parentSpanName, + Uni.createFrom().item(m) + .onItem().transform(s -> s + " in outer span") + .onItem().transformToUni(m1 -> wrapWithSpan(tracer, Optional.empty(), childSpanName, + Uni.createFrom().item(m1) + .onItem().transform(s -> "now in inner span"))) + + )) + .subscribe() + .withSubscriber(new UniAssertSubscriber<>()); + + subscriber.awaitItem(); + + //ensure there are 2 spans but without parent-child relationship + final List spans = spanExporter.getFinishedSpanItems(2); + assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName); + assertThat(spans.stream() + .filter(span -> span.getName().equals(childSpanName)) + .findAny() + .orElseThrow() + .getParentSpanId()).isEqualTo("0000000000000000");//signifies no parent + } + + @ParameterizedTest(name = "{index}: Concatenating multi pipeline {1}") + @MethodSource("generateContextRunners") + void testSimpleMultiPipeline_Concatenate(final String contextType, final String contextName) { + + final AssertSubscriber subscriber = Multi.createFrom() + .items("test1", "test2", "test3") + .emitOn(r -> runOnContext(r, vertx, contextType)) + .onItem() + .transformToUniAndConcatenate(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m, + //the traced pipeline + Uni.createFrom().item(m).onItem().transform(s -> { + final Span span = tracer.spanBuilder("subspan " + s).startSpan(); + try (final Scope scope = span.makeCurrent()) { + return s + " transformed"; + } finally { + span.end(); + } + }))) + .subscribe() + .withSubscriber(AssertSubscriber.create(3)); + + subscriber.awaitCompletion().assertItems("test1 transformed", "test2 transformed", "test3 transformed"); + + //ensure there are six spans with three pairs of subspan as child of testSpan + final List spans = spanExporter.getFinishedSpanItems(6); + for (int i = 1; i <= 3; i++) { + final int currentI = i; + assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue(); + assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue(); + assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI); + } + } + + @ParameterizedTest(name = "{index}: Merging multi pipeline {1}") + @MethodSource("generateContextRunners") + void testSimpleMultiPipeline_Merge(final String contextType, final String contextName) { + + final AssertSubscriber subscriber = Multi.createFrom() + .items("test1", "test2", "test3") + .emitOn(r -> runOnContext(r, vertx, contextType)) + .onItem() + .transformToUniAndMerge(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m, + Uni.createFrom().item(m).onItem().transform(s -> { + final Span span = tracer.spanBuilder("subspan " + s).startSpan(); + try (final Scope scope = span.makeCurrent()) { + return s + " transformed"; + } finally { + span.end(); + } + }))) + .subscribe() + .withSubscriber(AssertSubscriber.create(3)); + + subscriber.awaitCompletion(); + + //ensure there are six spans with three pairs of subspan as child of testSpan + final List spans = spanExporter.getFinishedSpanItems(6); + for (int i = 1; i <= 3; i++) { + final int currentI = i; + assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue(); + assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue(); + assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI); + } + } + + private static void assertChildSpan(final List spans, final String parentSpanName, + final String childSpanName1) { + assertThat(spans.stream() + .filter(span -> span.getName().equals(childSpanName1)) + .findAny() + .orElseThrow() + .getParentSpanId()).isEqualTo( + spans.stream().filter(span -> span.getName().equals(parentSpanName)).findAny().get().getSpanId()); + } + + private static Stream generateContextRunners() { + return Stream.of( + Arguments.of("WITHOUT_CONTEXT", "Without Context"), + Arguments.of("ROOT_CONTEXT", "On Root Context"), + Arguments.of("DUPLICATED_CONTEXT", "On Duplicated Context")); + } + + private void runOnContext(final Runnable runnable, final Vertx vertx, final String contextType) { + switch (contextType) { + case "WITHOUT_CONTEXT": + runWithoutContext(runnable); + break; + case "ROOT_CONTEXT": + runOnRootContext(runnable, vertx); + break; + case "DUPLICATED_CONTEXT": + runOnDuplicatedContext(runnable, vertx); + break; + default: + throw new IllegalArgumentException("Unknown context type: " + contextType); + } + } + + private static void runWithoutContext(final Runnable runnable) { + assertThat(QuarkusContextStorage.getVertxContext()).isNull(); + runnable.run(); + } + + private static void runOnRootContext(final Runnable runnable, final Vertx vertx) { + final Context rootContext = VertxContext.getRootContext(vertx.getOrCreateContext()); + assertThat(rootContext).isNotNull(); + assertThat(VertxContext.isDuplicatedContext(rootContext)).isFalse(); + assertThat(rootContext).isNotEqualTo(QuarkusContextStorage.getVertxContext()); + + rootContext.runOnContext(v -> runnable.run()); + } + + private static void runOnDuplicatedContext(final Runnable runnable, final Vertx vertx) { + final Context duplicatedContext = VertxContext.createNewDuplicatedContext(vertx.getOrCreateContext()); + assertThat(duplicatedContext).isNotNull(); + assertThat(VertxContext.isDuplicatedContext(duplicatedContext)).isTrue(); + + duplicatedContext.runOnContext(v -> runnable.run()); + } + +} diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/mutiny/MutinyTracingHelper.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/mutiny/MutinyTracingHelper.java new file mode 100644 index 0000000000000..a1197a0174312 --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/mutiny/MutinyTracingHelper.java @@ -0,0 +1,121 @@ +package io.quarkus.opentelemetry.runtime.tracing.mutiny; + +import java.util.Optional; +import java.util.concurrent.CancellationException; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; + +public class MutinyTracingHelper { + + /** + * Wraps the given pipeline with a span with the given name. Ensures that subspans find the current span as context, + * by running on a duplicated context. The span will be closed when the pipeline completes. + * If there is already a span in the current context, it will be used as parent for the new span. + *

+ * Use as follows: + * Given this existing pipeline: + * ```java + * Uni.createFrom().item("Hello") + * .onItem().transform(s -> s + " World") + * .subscribe().with(System.out::println); + * ``` + * wrap like this: + * ```java + * Uni.createFrom().item("Hello") + * .onItem().transformToUni(s -> wrapWithSpan(tracer, "mySpan", Uni.createFrom().item(s + " World"))) + * .subscribe().with(System.out::println); + * ``` + *

+ * it also works with multi: + * ```java + * Multi.createFrom().items("Alice", "Bob", "Charlie") + * .onItem().transform(name -> "Hello " + name) + * .subscribe().with(System.out::println); + * ``` + * wrap like this: + * ```java + * Multi.createFrom().items("Alice", "Bob", "Charlie") + * .onItem().transformToUni(s -> wrapWithSpan(tracer, "mySpan", Uni.createFrom().item("Hello " + s) + * .onItem().transform(name -> "Hello " + name) + * )) + * .subscribe().with(System.out::println); + * ``` + * + * @param the type of the result of the pipeline + * @param spanName + * the name of the span that should be created + * @param pipeline + * the pipeline to run within the span + * + * @return the result of the pipeline + */ + public static Uni wrapWithSpan(final Tracer tracer, final String spanName, final Uni pipeline) { + + return wrapWithSpan(tracer, Optional.of(io.opentelemetry.context.Context.current()), spanName, pipeline); + } + + /** + * see {@link #wrapWithSpan(Tracer, String, Uni)} + * use this method if you manually want to specify the parent context of the new span + * or if you want to ensure the new span is a root span. + * + * @param + * @param parentContext + * the parent context to use for the new span. If empty, a new root span will be created. + * @param spanName + * the name of the span that should be created + * @param pipeline + * the pipeline to run within the span + * + * @return the result of the pipeline + */ + public static Uni wrapWithSpan(final Tracer tracer, + final Optional parentContext, + final String spanName, final Uni pipeline) { + + return runOnDuplicatedContext(Uni.createFrom().deferred(() -> { + final SpanBuilder spanBuilder = tracer.spanBuilder(spanName); + if (parentContext.isPresent()) { + spanBuilder.setParent(parentContext.get()); + } else { + spanBuilder.setNoParent(); + } + final Span span = spanBuilder.startSpan(); + final Scope scope = span.makeCurrent(); + return pipeline.onTermination() + .invoke((o, throwable, isCancelled) -> { + try { + if (Boolean.TRUE.equals(isCancelled)) { + span.recordException(new CancellationException()); + } else if (throwable != null) { + span.recordException(throwable); + } + span.end(); + } finally { + scope.close(); + } + }); + })); + } + + private static Uni runOnDuplicatedContext(final Uni deferred) { + //creates duplicate context, if the current context is not a duplicated one and not null + //Otherwise returns the current context or null + final Context context = QuarkusContextStorage.getVertxContext(); + + return deferred.runSubscriptionOn(runnable -> { + if (context != null) { + context.runOnContext(v -> runnable.run()); + } else { + runnable.run(); + } + }); + } + +}