From 910d177e6c296b452a9ceeeb5627583e7f9d33b3 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 22 Nov 2022 18:25:59 +0200 Subject: [PATCH] Rocketmq 5: set context for async callback (#7238) Run callbacks added to the `CompletableFuture` returned from `sendAsync` with the context that was used when `sendAsync` was called. Add test for capturing message headers. --- .../v5_0/CompletableFutureWrapper.java | 32 ++ .../v5_0/ProducerImplInstrumentation.java | 25 +- .../v5_0/AbstractRocketMqClientTest.java | 397 +++++++++++++----- 3 files changed, 347 insertions(+), 107 deletions(-) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/CompletableFutureWrapper.java diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/CompletableFutureWrapper.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/CompletableFutureWrapper.java new file mode 100644 index 000000000000..b438e3152c77 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/CompletableFutureWrapper.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.concurrent.CompletableFuture; + +public final class CompletableFutureWrapper { + + private CompletableFutureWrapper() {} + + public static CompletableFuture wrap(CompletableFuture future) { + CompletableFuture result = new CompletableFuture<>(); + Context context = Context.current(); + future.whenComplete( + (T value, Throwable throwable) -> { + try (Scope ignored = context.makeCurrent()) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(value); + } + } + }); + + return result; + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java index 1972d98d29f4..00c96f6fa9ba 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import static io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0.RocketMqSingletons.producerInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -17,9 +18,11 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.List; +import java.util.concurrent.CompletableFuture; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; @@ -52,6 +55,13 @@ public void transform(TypeTransformer transformer) { .and(takesArgument(4, List.class)) .and(takesArgument(5, int.class)), ProducerImplInstrumentation.class.getName() + "$SendAdvice"); + + transformer.applyAdviceToMethod( + isMethod() + .and(named("sendAsync")) + .and(takesArguments(1)) + .and(takesArgument(0, named("org.apache.rocketmq.client.apis.message.Message"))), + ProducerImplInstrumentation.class.getName() + "$SendAsyncAdvice"); } @SuppressWarnings("unused") @@ -60,8 +70,7 @@ public static class SendAdvice { public static void onEnter( @Advice.Argument(0) SettableFuture> future0, @Advice.Argument(4) List messages) { - Instrumenter instrumenter = - RocketMqSingletons.producerInstrumenter(); + Instrumenter instrumenter = producerInstrumenter(); int count = messages.size(); List> futures = FutureConverter.convert(future0, count); for (int i = 0; i < count; i++) { @@ -90,4 +99,16 @@ public static void onEnter( } } } + + @SuppressWarnings("unused") + public static class SendAsyncAdvice { + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) CompletableFuture future, + @Advice.Thrown Throwable throwable) { + if (throwable == null) { + future = CompletableFutureWrapper.wrap(future); + } + } + } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index 58f7e4354024..d4ed1e572b78 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.rocketmqclient.v5_0; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; @@ -18,18 +19,25 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; @@ -38,37 +46,37 @@ import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.apache.rocketmq.client.java.impl.ClientImpl; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractRocketMqClientTest { + // Inner topic of the container. + private static final String topic = "normal-topic-0"; + private static final String tag = "tagA"; + private static final String consumerGroup = "group-normal-topic-0"; + private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); + private final ClientServiceProvider provider = ClientServiceProvider.loadService(); + private PushConsumer consumer; + private Producer producer; protected abstract InstrumentationExtension testing(); @BeforeAll - static void setUp() { + void setUp() throws ClientException { container.start(); - } - - @AfterAll - static void tearDown() { - container.close(); - } - - @Test - void testSendAndConsumeMessage() throws Throwable { ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); // Inner topic of the container. - String topic = "normal-topic-0"; ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = "group-normal-topic-0"; - String tag = "tagA"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); - try (PushConsumer ignored = + consumer = provider .newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) @@ -76,100 +84,279 @@ void testSendAndConsumeMessage() throws Throwable { .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener( messageView -> { - testing().runWithSpan("child", () -> {}); + testing().runWithSpan("messageListener", () -> {}); return ConsumeResult.SUCCESS; }) - .build()) { - try (Producer producer = - provider - .newProducerBuilder() - .setClientConfiguration(clientConfiguration) - .setTopics(topic) - .build()) { - - String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; - byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); - Message message = - provider - .newMessageBuilder() - .setTopic(topic) - .setTag(tag) - .setKeys(keys) - .setBody(body) - .build(); - - SendReceipt sendReceipt = - testing() - .runWithSpan( - "parent", - (ThrowingSupplier) () -> producer.send(message)); - AtomicReference sendSpanData = new AtomicReference<>(); - testing() - .waitAndAssertTraces( - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasKind(SpanKind.PRODUCER) - .hasName(topic + " send") - .hasStatus(StatusData.unset()) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), - equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), - equalTo(MESSAGING_SYSTEM, "rocketmq"), - equalTo( - MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), - equalTo( - MESSAGING_DESTINATION_KIND, - SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic))); - sendSpanData.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasKind(SpanKind.CONSUMER) - .hasName(topic + " receive") - .hasStatus(StatusData.unset()) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), - equalTo(MESSAGING_SYSTEM, "rocketmq"), - equalTo( - MESSAGING_DESTINATION_KIND, - SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic), - equalTo(MESSAGING_OPERATION, "receive")), - span -> - span.hasKind(SpanKind.CONSUMER) - .hasName(topic + " process") - .hasStatus(StatusData.unset()) - // Link to send span. - .hasLinks(LinkData.create(sendSpanData.get().getSpanContext())) - // As the child of receive span. - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), - equalTo( - MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), - equalTo(MESSAGING_SYSTEM, "rocketmq"), - equalTo( - MESSAGING_MESSAGE_ID, - sendReceipt.getMessageId().toString()), - equalTo( - MESSAGING_DESTINATION_KIND, - SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic), - equalTo(MESSAGING_OPERATION, "process")), - span -> - span.hasName("child") - .hasKind(SpanKind.INTERNAL) - .hasParent(trace.getSpan(1)))); - } + .build(); + producer = + provider + .newProducerBuilder() + .setClientConfiguration(clientConfiguration) + .setTopics(topic) + .build(); + } + + @AfterAll + void tearDown() throws IOException { + if (producer != null) { + producer.close(); + } + if (consumer != null) { + // Not calling consumer.close(); because it takes a lot of time to complete + ((ClientImpl) consumer).stopAsync(); } + container.close(); + } + + @Test + void testSendAndConsumeMessage() throws Throwable { + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", (ThrowingSupplier) () -> producer.send(message)); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + assertProducerSpan(span, topic, tag, keys, body, sendReceipt) + .hasParent(trace.getSpan(0))); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertReceiveSpan(span, topic, consumerGroup), + span -> + assertProcessSpan( + span, + sendSpanData.get(), + topic, + consumerGroup, + tag, + keys, + body, + sendReceipt) + // As the child of receive span. + .hasParent(trace.getSpan(0)), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + + @Test + public void testSendAsyncMessage() throws Exception { + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", + () -> + producer + .sendAsync(message) + .whenComplete( + (result, throwable) -> { + testing().runWithSpan("child", () -> {}); + }) + .get()); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent"), + span -> + assertProducerSpan(span, topic, tag, keys, body, sendReceipt) + .hasParent(trace.getSpan(0)), + span -> span.hasName("child")); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertReceiveSpan(span, topic, consumerGroup), + span -> + assertProcessSpan( + span, + sendSpanData.get(), + topic, + consumerGroup, + tag, + keys, + body, + sendReceipt) + // As the child of receive span. + .hasParent(trace.getSpan(0)), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + + @Test + public void testCapturedMessageHeaders() throws Throwable { + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .addProperty("test-message-header", "test") + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", (ThrowingSupplier) () -> producer.send(message)); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + assertProducerSpan( + span, + topic, + tag, + keys, + body, + sendReceipt, + equalTo( + AttributeKey.stringArrayKey( + "messaging.header.test_message_header"), + Arrays.asList(new String[] {"test"}))) + .hasParent(trace.getSpan(0))); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertReceiveSpan(span, topic, consumerGroup), + span -> + assertProcessSpan( + span, + sendSpanData.get(), + topic, + consumerGroup, + tag, + keys, + body, + sendReceipt, + equalTo( + AttributeKey.stringArrayKey( + "messaging.header.test_message_header"), + Arrays.asList(new String[] {"test"}))) + // As the child of receive span. + .hasParent(trace.getSpan(0)), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + + private static SpanDataAssert assertProducerSpan( + SpanDataAssert span, + String topic, + String tag, + String[] keys, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(attributeAssertions); + } + + private static SpanDataAssert assertReceiveSpan( + SpanDataAssert span, String topic, String consumerGroup) { + return span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " receive") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "receive")); + } + + private static SpanDataAssert assertProcessSpan( + SpanDataAssert span, + SpanData linkedSpan, + String topic, + String consumerGroup, + String tag, + String[] keys, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "process"))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // Link to send span. + .hasLinks(LinkData.create(linkedSpan.getSpanContext())) + .hasAttributesSatisfyingExactly(attributeAssertions); } }