Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation for rocketmq #2263

Merged
merged 57 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
535b95b
Merge pull request #1 from open-telemetry/main
addname Feb 12, 2021
0d1fbed
add rocketmq support
addname Feb 12, 2021
cb059ab
Merge branch 'main'
addname Feb 12, 2021
2f2cfcf
merge main
addname Feb 12, 2021
d3fd46b
Merge branch 'main'
addname Feb 15, 2021
bce0921
modify tests
addname Feb 16, 2021
924b2e3
modify tests
addname Feb 16, 2021
fb41154
modify tests
addname Feb 16, 2021
451d3a5
modify style
addname Feb 16, 2021
24d1d4a
modify style
addname Feb 16, 2021
fd9f3a4
modify style
addname Feb 16, 2021
06ea7bc
modify style
addname Feb 16, 2021
f40b086
modify tests
addname Feb 16, 2021
bad5c16
modify tests
addname Feb 16, 2021
73bbefb
modify tests
addname Feb 16, 2021
06175b2
modify tests
addname Feb 16, 2021
fb3dc3d
Merge branch 'main'
addname Feb 18, 2021
a7f764c
Merge branch 'main'
addname Feb 24, 2021
66617dc
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Feb 28, 2021
4094f4a
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
558acf7
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
310c268
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
c98b828
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
1ea63af
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
449b8e2
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
55aa9f2
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 5, 2021
a92c9b3
Fix
addname Mar 6, 2021
b5ba2ca
Revert "Use hooks to register in the iavaagent instrumentation"
addname Mar 6, 2021
0887762
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 6, 2021
48de0f7
Fix
addname Mar 6, 2021
ad8cbc7
Fix
addname Mar 6, 2021
f28eee6
Fix
addname Mar 6, 2021
81c3720
Fix
addname Mar 6, 2021
1c2c71f
Fix
addname Mar 6, 2021
0ed11a0
Fix
addname Mar 6, 2021
c32bc40
Fix
addname Mar 6, 2021
460092a
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 9, 2021
0489ef9
Fix
addname Mar 9, 2021
963e012
Fix
addname Mar 9, 2021
00aa1fa
Fix
addname Mar 9, 2021
5419081
Fix
addname Mar 9, 2021
8419db8
Fix
addname Mar 9, 2021
aa2c7b5
Fix
addname Mar 9, 2021
8c606f6
Update settings.gradle
anuraaga Mar 9, 2021
4a00e83
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
11aa520
Fix
addname Mar 10, 2021
9ffde5d
Merge remote-tracking branch 'origin/issue#1916' into issue#1916
addname Mar 10, 2021
6ad79eb
Fix
addname Mar 10, 2021
24de0ca
Fix
addname Mar 10, 2021
3aac1a5
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
b4eb7c3
Fix
addname Mar 10, 2021
58801ed
Fix
addname Mar 10, 2021
398246b
Fix
addname Mar 10, 2021
5b45a6a
Fix
addname Mar 10, 2021
f873db7
Fix
addname Mar 10, 2021
c0c4c9d
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
28f30fc
Fix
addname Mar 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ jobs:
env:
S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }}
S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }}
run: ./gradlew test -PtestJavaVersion=${{ matrix.java }} --stacktrace -Porg.gradle.java.installations.paths=${{ steps.setup-test-java.outputs.path }} -Porg.gradle.java.installations.auto-download=false
run: ./gradlew test -PtestJavaVersion=${{ matrix.java }} --stacktrace -Porg.gradle.java.installations.paths=${{ steps.setup-test-java.outputs.path }} -Porg.gradle.java.installations.auto-download=false
addname marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group = "org.apache.rocketmq"
module = 'rocketmq-client'
versions = "[4.8.0,)"
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved
assertInverse = true
}
}

dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
implementation project(':instrumentation:rocketmq-client-4.8:library')
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')

}

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.rocketmq.client.experimental-span-attributes=true"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.instrumentation.rocketmq.TracingConsumeMessageHookImpl;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;

public class RocketMqConsumerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
addname marked this conversation as resolved.
Show resolved Hide resolved
return hasClassesNamed("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqConsumerInstrumentation.class.getName() + "$AdviceStart");
}

public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(
value = "defaultMQPushConsumerImpl",
declaringType = DefaultMQPushConsumer.class)
DefaultMQPushConsumerImpl defaultMqPushConsumerImpl) {
defaultMqPushConsumerImpl.registerConsumeMessageHook(new TracingConsumeMessageHookImpl());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq-client", "rocketmq-client-4.8");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new RocketMqProducerInstrumentation(), new RocketMqConsumerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.instrumentation.rocketmq.TracingSendMessageHookImpl;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class RocketMqProducerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
addname marked this conversation as resolved.
Show resolved Hide resolved
return hasClassesNamed("org.apache.rocketmq.client.producer.DefaultMQProducer");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.producer.DefaultMQProducer");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqProducerInstrumentation.class.getName() + "$AdviceStart");
}

public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(value = "defaultMQProducerImpl", declaringType = DefaultMQProducer.class)
DefaultMQProducerImpl defaultMqProducerImpl) {
defaultMqProducerImpl.registerSendMessageHook(new TracingSendMessageHookImpl());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq

import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientTest
import io.opentelemetry.instrumentation.test.AgentTestTrait

class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"

dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')

}

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.rocketmq.client.experimental-span-attributes=true"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import io.opentelemetry.instrumentation.api.config.Config;

public final class RocketMqClientConfig {

public static boolean isPropagationEnabled() {
return Config.get()
.getBooleanProperty("otel.instrumentation.rocketmq.client-propagation", true);
}

public static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
Config.get()
.getBooleanProperty(
"otel.instrumentation.rocketmq.client.experimental-span-attributes", false);
addname marked this conversation as resolved.
Show resolved Hide resolved

private RocketMqClientConfig() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMqConsumerTracer extends BaseTracer {
addname marked this conversation as resolved.
Show resolved Hide resolved

private static final RocketMqConsumerTracer TRACER = new RocketMqConsumerTracer();

public static RocketMqConsumerTracer tracer() {
addname marked this conversation as resolved.
Show resolved Hide resolved
return TRACER;
}

addname marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.rocketmq-client";
}

public Context startSpan(Context parentContext, List<MessageExt> msgs) {
addname marked this conversation as resolved.
Show resolved Hide resolved
MessageExt msg = msgs.get(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are >1 messages? The rest won't be traced at all?

Copy link
Member Author

@addname addname Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When msgs. size( )>1, do we create one span or several span?
1.We create several span,but their "SemanticAttributes" vaules are the same.
2. Create one span, whose span name is marked that this span processes multiple data, for example : test-topic process size(or batch) 10 .
I think the second one is better. What's your opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think that messages may have different attributes, like different message id or queue offset.
Maybe we can do it the same way AWS SQS lambda handler instrumentation does in this scenario:

  • create one root span for the whole message listener (without a parent);
  • for each message create a child span and add a link to the producer span (extracted from the message headers).

This way each message will get its own span containing a link to propagated producer span, and the whole consumer processing will be wrapped in a single span. WDYT? @anuraaga

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and you can check out TracingSqsEventHandler, TracingSqsMessageHandler and AwsLambdaMessageTracer for an example of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (msgs.size() == 1) {
SpanBuilder spanBuilder = startSpanBuilder(msg).setParent(extractParent(msg));
return withClientSpan(parentContext, spanBuilder.startSpan());
addname marked this conversation as resolved.
Show resolved Hide resolved
} else {
SpanBuilder spanBuilder =
tracer
.spanBuilder(msg.getTopic() + " receive")
addname marked this conversation as resolved.
Show resolved Hide resolved
.setSpanKind(CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive");
Context rootContext = withClientSpan(parentContext, spanBuilder.startSpan());
addname marked this conversation as resolved.
Show resolved Hide resolved
for (MessageExt message : msgs) {
createChildSpan(rootContext, message);
}
return rootContext;
}
}

public void createChildSpan(Context parentContext, MessageExt msg) {
addname marked this conversation as resolved.
Show resolved Hide resolved
SpanBuilder childSpanBuilder =
startSpanBuilder(msg)
.setParent(parentContext)
.addLink(Span.fromContext(extractParent(msg)).getSpanContext());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular case if the propagation is turned off you shouldn't add a link - it'll end up pointing to the parentContext anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed this spec issue we may be able to handle this in the SDK too open-telemetry/opentelemetry-specification#1492

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this code as is, next SDK will handle invalid links for us

end(withClientSpan(parentContext, childSpanBuilder.startSpan()));
addname marked this conversation as resolved.
Show resolved Hide resolved
}

public SpanBuilder startSpanBuilder(MessageExt msg) {
SpanBuilder spanBuilder =
tracer
.spanBuilder(spanNameOnConsume(msg))
.setSpanKind(CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic())
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId())
.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
(long) msg.getBody().length);
onConsume(spanBuilder, msg);
return spanBuilder;
}

private Context extractParent(MessageExt msg) {
if (RocketMqClientConfig.isPropagationEnabled()) {
return extract(msg.getProperties(), GETTER);
} else {
return Context.current();
}
}

void onConsume(SpanBuilder spanBuilder, MessageExt msg) {
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId());
spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg));
}
}

String spanNameOnConsume(MessageExt msg) {
return msg.getTopic() + " process";
}

String getBrokerHost(MessageExt msg) {
addname marked this conversation as resolved.
Show resolved Hide resolved
if (msg.getStoreHost() != null) {
return msg.getStoreHost().toString().replace("/", "");
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import static io.opentelemetry.api.trace.SpanKind.PRODUCER;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class RocketMqProducerTracer extends BaseTracer {
addname marked this conversation as resolved.
Show resolved Hide resolved

private static final RocketMqProducerTracer TRACER = new RocketMqProducerTracer();

public static RocketMqProducerTracer tracer() {
return TRACER;
}

@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.rocketmq-client";
}

public Context startProducerSpan(String addr, Message msg, Context parentContext) {
SpanBuilder spanBuilder = spanBuilder(spanNameOnProduce(msg), PRODUCER);
addname marked this conversation as resolved.
Show resolved Hide resolved
onProduce(spanBuilder, msg, addr);
return withClientSpan(parentContext, spanBuilder.startSpan());
addname marked this conversation as resolved.
Show resolved Hide resolved
}

public void onCallback(Context context, SendResult sendResult) {
addname marked this conversation as resolved.
Show resolved Hide resolved
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
Span span = Span.fromContext(context);
span.setAttribute("messaging.rocketmq.callback_result", sendResult.getSendStatus().name());
}
}

private void onProduce(SpanBuilder spanBuilder, Message msg, String addr) {
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq");
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic());
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", addr);
}
}

public void afterProduce(Context context, SendResult sendResult) {
Span span = Span.fromContext(context);
span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, sendResult.getMsgId());
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
span.setAttribute("messaging.rocketmq.send_result", sendResult.getSendStatus().name());
}
}

private String spanNameOnProduce(Message msg) {
return msg.getTopic() + " send";
}
}
Loading