diff --git a/brave-bom/pom.xml b/brave-bom/pom.xml index bba48e8cd9..565334d756 100644 --- a/brave-bom/pom.xml +++ b/brave-bom/pom.xml @@ -226,6 +226,11 @@ brave-instrumentation-okhttp3 ${project.version} + + ${project.groupId} + brave-instrumentation-rocketmq-client + ${project.version} + ${project.groupId} brave-instrumentation-rpc diff --git a/instrumentation/README.md b/instrumentation/README.md index 8e76945ad6..ae07ffd68e 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -20,6 +20,7 @@ Here's a brief overview of what's packaged here: * [mysql8](mysql8/README.md) - Tracing MySQL v8 statement interceptor * [netty-codec-http](netty-codec-http/README.md) - Tracing handler for [Netty](http://netty.io/) 4.x http servers * [okhttp3](okhttp3/README.md) - Tracing decorators for [OkHttp](https://github.com/square/okhttp) 3.x +* [rocketmq-client](rocketmq-client/README.md) - Tracing decorators for RocketMQ producers and consumers. * [servlet](servlet/README.md) - Tracing filter for Servlet 2.5+ (including Async) * [spring-rabbit](spring-rabbit/README.md) - Tracing MessagePostProcessor and ListenerAdvice for [Spring Rabbit](https://spring.io/guides/gs/messaging-rabbitmq/) * [spring-web](spring-web/README.md) - Tracing interceptor for [Spring RestTemplate](https://spring.io/guides/gs/consuming-rest/) diff --git a/instrumentation/pom.xml b/instrumentation/pom.xml index c0e2f5a4d4..c2dd48809d 100644 --- a/instrumentation/pom.xml +++ b/instrumentation/pom.xml @@ -51,6 +51,7 @@ mysql6 mysql8 okhttp3 + rocketmq-client rpc servlet servlet-jakarta diff --git a/instrumentation/rocketmq-client/README.md b/instrumentation/rocketmq-client/README.md new file mode 100644 index 0000000000..b20c69a629 --- /dev/null +++ b/instrumentation/rocketmq-client/README.md @@ -0,0 +1,110 @@ +# brave-instrumentation-rocketmq-client + +## Tracing for RocketMQ Client + +This module provides instrumentation for RocketMQ based services. + +## example + +### producer + +The key is to register our hook to the producer + +```java +package brave.rocketmq.client; + +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; + +public class ProducerExample { + + public static void main(String[] args) throws Exception { + // todo Replaced with actual tracing construct + Tracing tracing = Tracing.newBuilder().build(); + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + RocketMQTracing producerTracing = RocketMQTracing.create( + MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build()); + + String topic = "testSend"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSend"); + // todo This is the key, register the hook to the producer + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new SendMessageBraveHookImpl(producerTracing)); + // Replace with actual address + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + producer.send(message); + + producer.shutdown(); + } +} + +``` + +### consumer + +wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently` +using `brave.rocketmq.client.RocketMQTracing.wrap(long, org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`, +or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly` +using `brave.rocketmq.client.RocketMQTracing.wrap(int, org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`; + +```java +package brave.rocketmq.client; + +import java.util.List; +import java.util.Optional; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.rocketmq.client.RocketMQTracing; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; + +public class ProducerExample { + + public static void main(String[] args) throws Exception { + // todo Replaced with actual tracing construct + Tracing tracing = Tracing.newBuilder().build(); + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + RocketMQTracing rocketMQTracing = RocketMQTracing.create( + MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build()); + + String topic = "testPushConsumer"; + String nameserverAddr = "127.0.0.1:9876"; + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.wrap(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + // do something + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.registerMessageListener(messageListenerConcurrently); + + consumer.start(); + } + +} + +``` + diff --git a/instrumentation/rocketmq-client/bnd.bnd b/instrumentation/rocketmq-client/bnd.bnd new file mode 100644 index 0000000000..ff1f9dad33 --- /dev/null +++ b/instrumentation/rocketmq-client/bnd.bnd @@ -0,0 +1,6 @@ +# We use brave.internal.Nullable, but it is not used at runtime. +Import-Package: \ + !brave.internal*,\ + * +Export-Package: \ + brave.rocketmq.client diff --git a/instrumentation/rocketmq-client/pom.xml b/instrumentation/rocketmq-client/pom.xml new file mode 100644 index 0000000000..59535a04c5 --- /dev/null +++ b/instrumentation/rocketmq-client/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + io.zipkin.brave + brave-instrumentation-parent + 6.0.1-SNAPSHOT + + + brave-instrumentation-rocketmq-client + Brave Instrumentation: RocketMQ Client + + + + brave.rocketmq.client + + ${project.basedir}/../.. + + 5.1.4 + + --add-opens java.base/java.nio=ALL-UNNAMED + + + + + ${project.groupId} + brave-instrumentation-messaging + ${project.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + provided + + + + ${project.groupId} + brave-tests + test + ${project.version} + + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTags.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTags.java new file mode 100644 index 0000000000..20d51d825a --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTags.java @@ -0,0 +1,25 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +public class RocketMQTags { + + public static final String TO_PREFIX = "To_"; + public static final String FROM_PREFIX = "From_"; + public static final String ROCKETMQ_SERVICE = "rocketmq"; + + // TODO: maybe like HttpTags.PATH if we support extended tags on first version + public static final String ROCKETMQ_TAGS = "rocketmq.tags"; + public static final String ROCKETMQ_TOPIC = "rocketmq.topic"; +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java new file mode 100644 index 0000000000..c3249cca50 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java @@ -0,0 +1,127 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.propagation.Propagation; +import brave.propagation.TraceContext.Extractor; +import brave.propagation.TraceContext.Injector; +import brave.propagation.TraceContextOrSamplingFlags; +import brave.sampler.SamplerFunction; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; + +import java.util.Map; + +public class RocketMQTracing { + + private static final long defaultSuspendCurrentQueueTimeMillis = 1000; + private static final int defaultDelayLevelWhenNextConsume = 0; + + public static RocketMQTracing create(Tracing tracing) { + return new RocketMQTracing(MessagingTracing.create(tracing), RocketMQTags.ROCKETMQ_SERVICE); + } + + public static RocketMQTracing create(MessagingTracing messagingTracing) { + return new RocketMQTracing(messagingTracing, RocketMQTags.ROCKETMQ_SERVICE); + } + + public static RocketMQTracing create(MessagingTracing messagingTracing, + String remoteServiceName) { + return new RocketMQTracing(messagingTracing, remoteServiceName); + } + + final Tracing tracing; + final Tracer tracer; + final Extractor producerExtractor; + final Extractor consumerExtractor; + final Injector producerInjector; + final Injector consumerInjector; + final String[] traceIdHeaders; + final SamplerFunction producerSampler, consumerSampler; + final String remoteServiceName; + + RocketMQTracing(MessagingTracing messagingTracing, + String remoteServiceName) { // intentionally hidden constructor + this.tracing = messagingTracing.tracing(); + this.tracer = tracing.tracer(); + Propagation propagation = messagingTracing.propagation(); + this.producerExtractor = propagation.extractor(TracingProducerRequest.GETTER); + this.consumerExtractor = propagation.extractor(TracingConsumerRequest.GETTER); + this.producerInjector = propagation.injector(TracingProducerRequest.SETTER); + this.consumerInjector = propagation.injector(TracingConsumerRequest.SETTER); + this.producerSampler = messagingTracing.producerSampler(); + this.consumerSampler = messagingTracing.consumerSampler(); + this.remoteServiceName = remoteServiceName; + + // We clear the trace ID headers, so that a stale consumer span is not preferred over current + // listener. We intentionally don't clear BaggagePropagation.allKeyNames as doing so will + // application fields "user_id" or "country_code" + this.traceIdHeaders = propagation.keys().toArray(new String[0]); + } + + TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(Extractor extractor, + R request, + Map properties) { + TraceContextOrSamplingFlags extracted = extractor.extract(request); + // Clear any propagation keys present in the headers + if (extracted.samplingFlags() == null) { // then trace IDs were extracted + if (properties != null) { + clearTraceIdHeaders(properties); + } + } + return extracted; + } + + /** + * Creates a potentially noop remote span representing this request. + */ + Span nextMessagingSpan(SamplerFunction sampler, MessagingRequest request, + TraceContextOrSamplingFlags extracted) { + Boolean sampled = extracted.sampled(); + // only recreate the context if the messaging sampler made a decision + if (sampled == null && (sampled = sampler.trySample(request)) != null) { + extracted = extracted.sampled(sampled); + } + return tracer.nextSpan(extracted); + } + + // We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3 + // multi, or vice versa. + void clearTraceIdHeaders(Map headers) { + for (String traceIDHeader : traceIdHeaders) + headers.remove(traceIDHeader); + } + + /** + * Extracts or creates a {@link Span.Kind#CONSUMER} span for each message received. This span is + * injected onto each message so it becomes the parent when a processor later calls {@link Tracer#nextSpan(TraceContextOrSamplingFlags)}. + */ + public MessageListenerOrderly messageListenerOrderly(MessageListenerOrderly messageListenerOrderly) { + return new TracingMessageListenerOrderly(this, messageListenerOrderly); + } + + /** + * Extracts or creates a {@link Span.Kind#CONSUMER} span for each message received. This span is + * injected onto each message so it becomes the parent when a processor later calls {@link Tracer#nextSpan(TraceContextOrSamplingFlags)}. + */ + public MessageListenerConcurrently messageListenerConcurrently(MessageListenerConcurrently messageListenerConcurrently) { + return new TracingMessageListenerConcurrently(this, messageListenerConcurrently); + } + +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java new file mode 100644 index 0000000000..708e94b29d --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.messaging.ConsumerRequest; +import brave.propagation.Propagation.RemoteGetter; +import brave.propagation.Propagation.RemoteSetter; +import org.apache.rocketmq.common.message.MessageExt; + +// intentionally not yet public until we add tag parsing functionality +final class TracingConsumerRequest extends ConsumerRequest { + static final RemoteGetter GETTER = + new RemoteGetter() { + @Override public Span.Kind spanKind() { + return Span.Kind.CONSUMER; + } + + @Override public String get(TracingConsumerRequest request, String name) { + return request.delegate.getUserProperty(name); + } + + @Override public String toString() { + return "MessageExt::getUserProperty"; + } + }; + + static final RemoteSetter SETTER = + new RemoteSetter() { + @Override public Span.Kind spanKind() { + return Span.Kind.CONSUMER; + } + + @Override public void put(TracingConsumerRequest request, String name, String value) { + request.delegate.putUserProperty(name, value); + } + + @Override public String toString() { + return "MessageExt::putUserProperty"; + } + }; + + final MessageExt delegate; + + TracingConsumerRequest(MessageExt delegate) { + if (delegate == null) throw new NullPointerException("delegate == null"); + this.delegate = delegate; + } + + @Override public MessageExt unwrap() { + return delegate; + } + + @Override public String operation() { + return "receive"; + } + + @Override public String channelKind() { + return "topic"; + } + + @Override public String channelName() { + return delegate.getTopic(); + } + + @Override public String messageId() { + return delegate.getMsgId(); + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java new file mode 100644 index 0000000000..0012496b5d --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer.SpanInScope; +import brave.Tracing; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.Collections; +import java.util.List; + +class TracingMessageListenerConcurrently implements MessageListenerConcurrently { + + private final RocketMQTracing tracing; + final MessageListenerConcurrently messageListenerConcurrently; + + TracingMessageListenerConcurrently(RocketMQTracing tracing, MessageListenerConcurrently messageListenerConcurrently) { + this.tracing = tracing; + this.messageListenerConcurrently = messageListenerConcurrently; + } + + @Override + public final ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + TracingConsumerRequest request = new TracingConsumerRequest(msg); + Span span = + Util.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + request, msg.getProperties()); + span.name(RocketMQTags.FROM_PREFIX + msg.getTopic()); + + ConsumeConcurrentlyStatus result; + try (SpanInScope scope = tracing.tracer.withSpanInScope(span)) { + result = messageListenerConcurrently.consumeMessage(Collections.singletonList(msg), context); + } catch (Throwable e) { + span.error(e); + throw e; + } finally { + span.finish(); + } + + if (result != ConsumeConcurrentlyStatus.CONSUME_SUCCESS) { + return result; + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java new file mode 100644 index 0000000000..884c6ce8a7 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java @@ -0,0 +1,62 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer.SpanInScope; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.Collections; +import java.util.List; + +class TracingMessageListenerOrderly implements MessageListenerOrderly { + private final RocketMQTracing tracing; + final MessageListenerOrderly messageListenerOrderly; + + TracingMessageListenerOrderly(RocketMQTracing tracing, MessageListenerOrderly messageListenerOrderly) { + this.tracing = tracing; + this.messageListenerOrderly = messageListenerOrderly; + } + + @Override + public final ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { + for (MessageExt msg : msgs) { + TracingConsumerRequest request = new TracingConsumerRequest(msg); + Span span = + Util.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + request, msg.getProperties()); + span.name(RocketMQTags.FROM_PREFIX + msg.getTopic()); + + ConsumeOrderlyStatus result; + try (SpanInScope scope = tracing.tracer.withSpanInScope(span)) { + result = messageListenerOrderly.consumeMessage(Collections.singletonList(msg), context); + } catch (Throwable e) { + span.error(e); + throw e; + } finally { + span.finish(); + } + + if (result != ConsumeOrderlyStatus.SUCCESS) { + return result; + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java new file mode 100644 index 0000000000..1e0767741d --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span.Kind; +import brave.messaging.ProducerRequest; +import brave.propagation.Propagation.RemoteGetter; +import brave.propagation.Propagation.RemoteSetter; +import org.apache.rocketmq.common.message.Message; + +// intentionally not yet public until we add tag parsing functionality +final class TracingProducerRequest extends ProducerRequest { + static final RemoteGetter GETTER = + new RemoteGetter() { + @Override public Kind spanKind() { + return Kind.PRODUCER; + } + + @Override public String get(TracingProducerRequest request, String name) { + return request.delegate.getUserProperty(name); + } + + @Override public String toString() { + return "Message::getUserProperty"; + } + }; + + static final RemoteSetter SETTER = + new RemoteSetter() { + @Override public Kind spanKind() { + return Kind.PRODUCER; + } + + @Override public void put(TracingProducerRequest request, String name, String value) { + request.delegate.putUserProperty(name, value); + } + + @Override public String toString() { + return "Message::putUserProperty"; + } + }; + + final Message delegate; + + TracingProducerRequest(Message delegate) { + if (delegate == null) throw new NullPointerException("delegate == null"); + this.delegate = delegate; + } + + @Override public Message unwrap() { + return delegate; + } + + @Override public String operation() { + return "send"; + } + + @Override public String channelKind() { + return "topic"; + } + + @Override public String channelName() { + return delegate.getTopic(); + } + + @Override public String messageId() { + return null; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java new file mode 100644 index 0000000000..1de6d7bb1d --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java @@ -0,0 +1,77 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; + +final class TracingSendMessageHook implements SendMessageHook { + final RocketMQTracing tracing; + + public TracingSendMessageHook(RocketMQTracing tracing) { + this.tracing = tracing; + } + + @Override + public String hookName() { + return "TracingSendMessageHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + if (context == null || context.getMessage() == null) { + return; + } + + Message msg = context.getMessage(); + TracingProducerRequest request = new TracingProducerRequest(msg); + Span span = + Util.createAndStartSpan(tracing, tracing.producerExtractor, tracing.producerSampler, + request, + msg.getProperties()); + span.name(RocketMQTags.TO_PREFIX + msg.getTopic()); + if (msg.getTags() != null && !msg.getTags().isEmpty()) { + span.tag(RocketMQTags.ROCKETMQ_TAGS, msg.getTags()); + } + context.setMqTraceContext(span); + tracing.producerInjector.inject(span.context(), request); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + if (context == null || context.getMessage() == null || context.getMqTraceContext() == null) { + return; + } + + SendResult sendResult = context.getSendResult(); + Span span = (Span) context.getMqTraceContext(); + TracingProducerRequest request = new TracingProducerRequest(context.getMessage()); + + if (sendResult == null) { + if (context.getCommunicationMode() == CommunicationMode.ASYNC) { + return; + } + span.finish(); + tracing.producerInjector.inject(span.context(), request); + return; + } + + tracing.producerInjector.inject(span.context(), request); + span.finish(); + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java new file mode 100644 index 0000000000..ed4476051a --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.messaging.MessagingRequest; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; +import brave.sampler.SamplerFunction; +import java.util.Map; + +class Util { + static Span createAndStartSpan(RocketMQTracing tracing, + TraceContext.Extractor extractor, SamplerFunction sampler, T request, + Map props) { + Tracer tracer = tracing.tracer; + CurrentTraceContext currentTraceContext = tracing.tracing.currentTraceContext(); + TraceContext traceContext = currentTraceContext.get(); + Span span; + + if (traceContext == null) { + TraceContextOrSamplingFlags extracted = + tracing.extractAndClearTraceIdHeaders(extractor, request, props); + span = tracing.nextMessagingSpan(sampler, request, extracted); + } else { + span = tracer.newChild(traceContext); + } + + span.kind(request.spanKind()); + span.remoteServiceName(tracing.remoteServiceName); + span.tag(RocketMQTags.ROCKETMQ_TOPIC, request.channelName()); + span.start(); + return span; + } + +} diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java new file mode 100644 index 0000000000..235f71e5a6 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java @@ -0,0 +1,223 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import brave.Span; +import brave.handler.MutableSpan; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.Sampler; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import brave.test.ITRemote; +import brave.test.IntegrationTestSpanHandler; + +@Tag("docker") +@Testcontainers(disabledWithoutDocker = true) +@Timeout(60) +class ITRocketMQTracingTest extends ITRemote { + @Container static RocketMQContainer rocketMQ = new RocketMQContainer(); + + IntegrationTestSpanHandler producerSpanHandler = new IntegrationTestSpanHandler(); + IntegrationTestSpanHandler consumerSpanHandler = new IntegrationTestSpanHandler(); + + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + SamplerFunction consumerSampler = SamplerFunctions.deferDecision(); + + RocketMQTracing producerTracing = + RocketMQTracing.create(MessagingTracing + .newBuilder( + tracingBuilder(Sampler.ALWAYS_SAMPLE).localServiceName("producer").clearSpanHandlers() + .addSpanHandler(producerSpanHandler).build()) + .producerSampler(r -> producerSampler.trySample(r)).build()); + + RocketMQTracing consumerTracing = + RocketMQTracing.create(MessagingTracing + .newBuilder( + tracingBuilder(Sampler.ALWAYS_SAMPLE).localServiceName("consumer").clearSpanHandlers() + .addSpanHandler(consumerSpanHandler).build()) + .consumerSampler(r -> consumerSampler.trySample(r)).build()); + + @Test void send() throws Exception { + String topic = "testSend"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSend"); + // TODO: what is this deprecated in favor of? + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + producer.send(message); + + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void sendOneway() throws Exception { + String topic = "testSendOneway"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSendOneway"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + producer.sendOneway(message); + + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void sendAsync() throws Exception { + String topic = "testSendAsync"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSendAsync"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + producer.send(message, new SendCallback() { + @Override public void onSuccess(SendResult sendResult) { + } + + @Override public void onException(Throwable e) { + + } + }); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + producer.shutdown(); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void tracingMessageListenerConcurrently() throws Exception { + String topic = "tracingMessageListenerConcurrently"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerConcurrently"); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = + new DefaultMQPushConsumer("tracingMessageListenerConcurrently"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + MessageListenerConcurrently messageListenerConcurrently = consumerTracing.messageListenerConcurrently(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.registerMessageListener(messageListenerConcurrently); + producer.send(message); + consumer.start(); + + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(consumerSpan.parentId()).isNull(); + } + + @Test void tracingMessageListenerOrderly() throws Exception { + String topic = "tracingMessageListenerOrderly"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerOrderly"); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = + new DefaultMQPushConsumer("tracingMessageListenerOrderly"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + MessageListenerOrderly messageListenerOrderly = consumerTracing.messageListenerOrderly(new MessageListenerOrderly() { + @Override + public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { + return ConsumeOrderlyStatus.SUCCESS; + } + }); + consumer.registerMessageListener(messageListenerOrderly); + producer.send(message); + consumer.start(); + + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(consumerSpan.parentId()).isNull(); + } + + @Test void all() throws Exception { + String topic = "testAll"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("testAll"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testAll"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + MessageListenerOrderly messageListenerOrderly = consumerTracing.messageListenerOrderly(new MessageListenerOrderly() { + @Override + public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { + return ConsumeOrderlyStatus.SUCCESS; + } + }); + consumer.registerMessageListener(messageListenerOrderly); + + producer.send(message); + consumer.start(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(producerSpan.parentId()).isNull(); + assertThat(consumerSpan.parentId()).isNotNull(); + assertChildOf(consumerSpan, producerSpan); + } +} diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java new file mode 100644 index 0000000000..5f13d0a55a --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java @@ -0,0 +1,66 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +final class RocketMQContainer extends GenericContainer { + static final int NAMESERVER_PORT = 9876; + static final int BROKER_PORT = 10911; + + RocketMQContainer() { + super(DockerImageName.parse("apache/rocketmq:5.1.4")); + List portBindings = new ArrayList<>(); + portBindings.add(String.format("%d:%d", NAMESERVER_PORT, NAMESERVER_PORT)); + portBindings.add(String.format("%d:%d", BROKER_PORT, BROKER_PORT)); + setPortBindings(portBindings); + + // do not publish all ports + withCreateContainerCmdModifier(cmd -> { + if (cmd.getHostConfig() != null) { + cmd.getHostConfig().withPublishAllPorts(false); + } + }); + + setCommand("sh /start.sh"); + this.waitStrategy = + Wait.forLogMessage(".*--JoeKerouac--.*", 1).withStartupTimeout(Duration.ofSeconds(60)); + + mount("broker.conf"); + mount("start.sh"); + } + + private void mount(String fileName) { + URL confUrl = getClass().getClassLoader().getResource(fileName); + try { + String confPath = new File(confUrl.toURI()).getAbsolutePath(); + withFileSystemBind(confPath, "/" + fileName, BindMode.READ_ONLY); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + public String getNamesrvAddr() { + return getHost() + ":" + NAMESERVER_PORT; + } +} diff --git a/instrumentation/rocketmq-client/src/test/resources/broker.conf b/instrumentation/rocketmq-client/src/test/resources/broker.conf new file mode 100644 index 0000000000..4948380251 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/broker.conf @@ -0,0 +1,3 @@ +brokerName=JoeKerouac-Test +brokerIP1=127.0.0.1 +autoCreateTopicEnable=true diff --git a/instrumentation/rocketmq-client/src/test/resources/log4j2.properties b/instrumentation/rocketmq-client/src/test/resources/log4j2.properties new file mode 100644 index 0000000000..e6988c4497 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/log4j2.properties @@ -0,0 +1,9 @@ +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT + diff --git a/instrumentation/rocketmq-client/src/test/resources/start.sh b/instrumentation/rocketmq-client/src/test/resources/start.sh new file mode 100644 index 0000000000..d1b11643eb --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/start.sh @@ -0,0 +1,41 @@ +function check() { + for i in {1..10} + do + if grep -q "$1" $2; then + break + else + sleep 1 + fi + done +} + +function createTopic() { + sh /home/rocketmq/rocketmq-5.1.4/bin/mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t $1 +} + +cleanup() { + jps | grep -v Jps | awk '{print $1}' | xargs -I {} kill {} + exit 0 +} + +trap cleanup SIGINT SIGTERM + +sh /home/rocketmq/rocketmq-5.1.4/bin/mqnamesrv > ~/ns.log 2>&1 & +check "The Name Server boot success" ~/ns.log + +sh /home/rocketmq/rocketmq-5.1.4/bin/mqbroker -n 127.0.0.1:9876 -c /broker.conf > ~/broker.log 2>&1 & +check "boot success" ~/broker.log + +createTopic testSend +createTopic testSendOneway +createTopic testSendAsync +createTopic tracingMessageListenerConcurrently +createTopic tracingMessageListenerOrderly +createTopic testAll + +echo "--JoeKerouac--" + +while true +do + sleep 1 +done