diff --git a/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy b/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy index ab0dc6021d01..d551ec869ea4 100644 --- a/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy +++ b/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy @@ -162,7 +162,7 @@ class Jms2Test extends AgentInstrumentationSpecification { expect: receivedMessage == null // span is not created if no message is received - assertTraces(0,{}) + assertTraces(0, {}) cleanup: consumer.close() @@ -194,6 +194,47 @@ class Jms2Test extends AgentInstrumentationSpecification { session.createTopic("someTopic") | "topic" | "someTopic" } + def "sending a message to #destinationName #destinationType with explicit destination propagates context"() { + given: + def producer = session.createProducer(null) + def consumer = session.createConsumer(destination) + + def lock = new CountDownLatch(1) + def messageRef = new AtomicReference() + consumer.setMessageListener new MessageListener() { + @Override + void onMessage(Message message) { + lock.await() // ensure the producer trace is reported first. + messageRef.set(message) + } + } + + when: + producer.send(destination, message) + lock.countDown() + + then: + assertTraces(1) { + trace(0, 2) { + producerSpan(it, 0, destinationType, destinationName) + consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") + } + } + // This check needs to go after all traces have been accounted for + messageRef.get().text == messageText + + cleanup: + producer.close() + consumer.close() + + where: + destination | destinationType | destinationName + session.createQueue("someQueue") | "queue" | "someQueue" + session.createTopic("someTopic") | "topic" | "someTopic" + session.createTemporaryQueue() | "queue" | "(temporary)" + session.createTemporaryTopic() | "topic" | "(temporary)" + } + static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { trace.span(index) { name destinationName + " send" diff --git a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageProducerInstrumentation.java b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageProducerInstrumentation.java index 9ce5a6da0ebd..016bec033c89 100644 --- a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageProducerInstrumentation.java +++ b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageProducerInstrumentation.java @@ -77,9 +77,7 @@ public static void onEnter( MessageDestination messageDestination = tracer().extractDestination(message, defaultDestination); context = tracer().startProducerSpan(messageDestination, message); - // TODO: why are we propagating context only in this advice class? the other one does not - // inject current span context into JMS message - scope = tracer().startProducerScope(context, message); + scope = context.makeCurrent(); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -90,9 +88,9 @@ public static void stopSpan( if (scope == null) { return; } - scope.close(); CallDepthThreadLocalMap.reset(MessageProducer.class); + scope.close(); if (throwable != null) { tracer().endExceptionally(context, throwable); } else { @@ -129,6 +127,7 @@ public static void stopSpan( } CallDepthThreadLocalMap.reset(MessageProducer.class); + scope.close(); if (throwable != null) { tracer().endExceptionally(context, throwable); } else { diff --git a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsTracer.java b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsTracer.java index 23682bbd6f14..f71f9ed173dd 100644 --- a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsTracer.java +++ b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsTracer.java @@ -10,10 +10,8 @@ import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER; import static io.opentelemetry.javaagent.instrumentation.jms.MessageInjectAdapter.SETTER; -import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.concurrent.TimeUnit; @@ -43,12 +41,7 @@ public Context startConsumerSpan( MessageDestination destination, String operation, Message message, long startTime) { Context parentContext = Context.root(); if (message != null && "process".equals(operation)) { - // TODO use BaseTracer.extract() which has context leak detection - // (and fix the context leak that it is currently detecting when running Jms2Test) - parentContext = - GlobalOpenTelemetry.getPropagators() - .getTextMapPropagator() - .extract(Context.root(), message, GETTER); + parentContext = extract(message, GETTER); } SpanBuilder spanBuilder = @@ -64,12 +57,9 @@ public Context startProducerSpan(MessageDestination destination, Message message Context parentContext = Context.current(); SpanBuilder span = spanBuilder(parentContext, spanName(destination, "send"), PRODUCER); afterStart(span, destination, message); - return parentContext.with(span.startSpan()); - } - - public Scope startProducerScope(Context context, Message message) { + Context context = parentContext.with(span.startSpan()); inject(context, message, SETTER); - return context.makeCurrent(); + return context; } public String spanName(MessageDestination destination, String operation) { diff --git a/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy b/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy index ce64b994034e..f5ab8d4138c2 100644 --- a/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy +++ b/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy @@ -24,7 +24,9 @@ import org.slf4j.LoggerFactory import org.testcontainers.containers.GenericContainer import org.testcontainers.containers.output.Slf4jLogConsumer import spock.lang.Shared +import spock.lang.Unroll +@Unroll class Jms1Test extends AgentInstrumentationSpecification { private static final Logger logger = LoggerFactory.getLogger(Jms1Test) @@ -224,6 +226,47 @@ class Jms1Test extends AgentInstrumentationSpecification { session.createTemporaryTopic() | "topic" | "(temporary)" } + def "sending a message to #destinationName #destinationType with explicit destination propagates context"() { + given: + def producer = session.createProducer(null) + def consumer = session.createConsumer(destination) + + def lock = new CountDownLatch(1) + def messageRef = new AtomicReference() + consumer.setMessageListener new MessageListener() { + @Override + void onMessage(Message message) { + lock.await() // ensure the producer trace is reported first. + messageRef.set(message) + } + } + + when: + producer.send(destination, message) + lock.countDown() + + then: + assertTraces(1) { + trace(0, 2) { + producerSpan(it, 0, destinationType, destinationName) + consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") + } + } + // This check needs to go after all traces have been accounted for + messageRef.get().text == messageText + + cleanup: + producer.close() + consumer.close() + + where: + destination | destinationType | destinationName + session.createQueue("someQueue") | "queue" | "someQueue" + session.createTopic("someTopic") | "topic" | "someTopic" + session.createTemporaryQueue() | "queue" | "(temporary)" + session.createTemporaryTopic() | "topic" | "(temporary)" + } + static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { trace.span(index) { name destinationName + " send" diff --git a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java index 2410bb676765..c68e770362d7 100644 --- a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java +++ b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java @@ -73,6 +73,13 @@ protected Boolean computeValue(Class taskClass) { if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) { return false; } + + // Avoid instrumenting internal OrderedExecutor worker class + if (enclosingClass + .getName() + .equals("org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor")) { + return false; + } } return true;