Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation for rocketmq #2263

Merged
merged 57 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
535b95b
Merge pull request #1 from open-telemetry/main
addname Feb 12, 2021
0d1fbed
add rocketmq support
addname Feb 12, 2021
cb059ab
Merge branch 'main'
addname Feb 12, 2021
2f2cfcf
merge main
addname Feb 12, 2021
d3fd46b
Merge branch 'main'
addname Feb 15, 2021
bce0921
modify tests
addname Feb 16, 2021
924b2e3
modify tests
addname Feb 16, 2021
fb41154
modify tests
addname Feb 16, 2021
451d3a5
modify style
addname Feb 16, 2021
24d1d4a
modify style
addname Feb 16, 2021
fd9f3a4
modify style
addname Feb 16, 2021
06ea7bc
modify style
addname Feb 16, 2021
f40b086
modify tests
addname Feb 16, 2021
bad5c16
modify tests
addname Feb 16, 2021
73bbefb
modify tests
addname Feb 16, 2021
06175b2
modify tests
addname Feb 16, 2021
fb3dc3d
Merge branch 'main'
addname Feb 18, 2021
a7f764c
Merge branch 'main'
addname Feb 24, 2021
66617dc
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Feb 28, 2021
4094f4a
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
558acf7
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
310c268
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
c98b828
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
1ea63af
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
449b8e2
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
55aa9f2
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 5, 2021
a92c9b3
Fix
addname Mar 6, 2021
b5ba2ca
Revert "Use hooks to register in the iavaagent instrumentation"
addname Mar 6, 2021
0887762
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 6, 2021
48de0f7
Fix
addname Mar 6, 2021
ad8cbc7
Fix
addname Mar 6, 2021
f28eee6
Fix
addname Mar 6, 2021
81c3720
Fix
addname Mar 6, 2021
1c2c71f
Fix
addname Mar 6, 2021
0ed11a0
Fix
addname Mar 6, 2021
c32bc40
Fix
addname Mar 6, 2021
460092a
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 9, 2021
0489ef9
Fix
addname Mar 9, 2021
963e012
Fix
addname Mar 9, 2021
00aa1fa
Fix
addname Mar 9, 2021
5419081
Fix
addname Mar 9, 2021
8419db8
Fix
addname Mar 9, 2021
aa2c7b5
Fix
addname Mar 9, 2021
8c606f6
Update settings.gradle
anuraaga Mar 9, 2021
4a00e83
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
11aa520
Fix
addname Mar 10, 2021
9ffde5d
Merge remote-tracking branch 'origin/issue#1916' into issue#1916
addname Mar 10, 2021
6ad79eb
Fix
addname Mar 10, 2021
24de0ca
Fix
addname Mar 10, 2021
3aac1a5
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
b4eb7c3
Fix
addname Mar 10, 2021
58801ed
Fix
addname Mar 10, 2021
398246b
Fix
addname Mar 10, 2021
5b45a6a
Fix
addname Mar 10, 2021
f873db7
Fix
addname Mar 10, 2021
c0c4c9d
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
28f30fc
Fix
addname Mar 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group = "org.apache.rocketmq"
module = 'rocketmq-client'
versions = "[4.8.0,)"
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved
}
}
dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
implementation project(':instrumentation:rocketmq-client-4.8:library')
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static io.opentelemetry.instrumentation.rocketmq.RocketMqProducerTracer.tracer;
import static io.opentelemetry.instrumentation.rocketmq.TextMapInjectAdapter.SETTER;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.rocketmq.SendCallbackWrapper;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;

public class RocketMqClientApiImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.impl.MQClientAPIImpl");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("sendMessage")).and(takesArguments(12)),
RocketMqClientApiImplInstrumentation.class.getName() + "$SendMessageAdvice");
}

public static class SendMessageAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) String addr,
@Advice.Argument(value = 2, readOnly = false) Message msg,
@Advice.Argument(value = 3, readOnly = false) SendMessageRequestHeader requestHeader,
@Advice.Argument(value = 6, readOnly = false) SendCallback sendCallback,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {

Context parent = Java8BytecodeBridge.currentContext();
span = tracer().startProducerSpan(addr, msg);
Context newContext = parent.with(span);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of examples in this repo that still return Span from tracers' startSpan methods, but usually in the newer instrumentations we're trying to return Context instead. You can find some very simple examples of that in the BaseTracer

try {
Java8BytecodeBridge.getGlobalPropagators()
.getTextMapPropagator()
.inject(newContext, requestHeader, SETTER);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, wouldn't it be worth it to check RocketMqClientConfig.isPropagationEnabled() here as well?

} catch (IllegalStateException e) {
requestHeader = new SendMessageRequestHeader();
requestHeader.getBornTimestamp();
requestHeader.getDefaultTopic();
requestHeader.getDefaultTopicQueueNums();
requestHeader.getFlag();
requestHeader.getProducerGroup();
requestHeader.getMaxReconsumeTimes();
requestHeader.getProperties();
requestHeader.getSysFlag();
requestHeader.getTopic();
requestHeader.getQueueId();
requestHeader.getReconsumeTimes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to override the header this way? Won't the users lose some information?

Java8BytecodeBridge.getGlobalPropagators()
.getTextMapPropagator()
.inject(newContext, requestHeader, SETTER);
}

scope = newContext.makeCurrent();
if (sendCallback != null) {
sendCallback = new SendCallbackWrapper(sendCallback, span);
}
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Argument(value = 6, readOnly = false) SendCallback sendCallback,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
if (sendCallback == null) {
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMqConcurrentlyConsumeInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(
named("org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently"));
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("consumeMessage")),
RocketMqConcurrentlyConsumeInstrumentation.class.getName() + "$ConcurrentlyConsumeAdvice");
}

public static class ConcurrentlyConsumeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) List<MessageExt> msgs,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {

span = tracer().startSpan(msgs);
scope = span.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Return ConsumeConcurrentlyStatus status,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
tracer().endConcurrentlySpan(span, status);
scope.close();
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq", "rockemq-client-4.3");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the name of the lib rocketmq-client? That's what we should put as the main instrumentation name then.
(Also, you had a typo in the second instrumentation name)

Suggested change
super("rocketmq", "rockemq-client-4.3");
super("rocketmq-client", "rocketmq-client-4.3");

}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RocketMqClientApiImplInstrumentation(),
new RocketMqConcurrentlyConsumeInstrumentation(),
new RocketMqOrderlyConsumeInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMqOrderlyConsumeInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(
named("org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever we use implements/extends type matcher we always add a classLoaderOptimization() method to the type instrumentation, for example: ApacheHttpAsyncClientInstrumentation
The reason for that is described in detail in the TypeInstrumentation Javadoc, but the here's the short version: implements/extends checks are very expensive and before trying to apply the type instrumentation it is worth checking whether the classloader contains the interface class at all.

}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
nameStartsWith("consumeMessage"),
RocketMqOrderlyConsumeInstrumentation.class.getName() + "$OrderlyConsumeAdvice");
}

public static class OrderlyConsumeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) List<MessageExt> msgs,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {

span = tracer().startSpan(msgs);
scope = span.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Return ConsumeOrderlyStatus status,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
tracer().endOrderlySpan(span, status);
scope.close();
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq

import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientTest
import io.opentelemetry.instrumentation.test.AgentTestTrait

class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"


dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import io.opentelemetry.instrumentation.api.config.Config;

public final class RocketMqClientConfig {

public static boolean isPropagationEnabled() {
return Config.get()
.getBooleanProperty("otel.instrumentation.rocketmq.client-propagation", true);
}

private RocketMqClientConfig() {}
}
Loading