Skip to content

Commit 7d6a41b

Browse files
Merge pull request #2156 from DataDog/landerson/datastax-4
Datastax Cassandra 4 instrumentation
2 parents 58af33d + 165b840 commit 7d6a41b

File tree

7 files changed

+548
-0
lines changed

7 files changed

+548
-0
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
ext {
2+
minJavaVersionForTests = JavaVersion.VERSION_1_8
3+
}
4+
5+
muzzle {
6+
pass {
7+
group = "com.datastax.oss"
8+
module = "java-driver-core"
9+
versions = "[4.0,]"
10+
assertInverse = true
11+
}
12+
}
13+
14+
apply from: "$rootDir/gradle/java.gradle"
15+
16+
apply plugin: 'org.unbroken-dome.test-sets'
17+
18+
testSets {
19+
latestDepTest {
20+
dirName = 'test'
21+
}
22+
}
23+
24+
dependencies {
25+
compileOnly group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.0'
26+
main_java8CompileOnly group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.0'
27+
28+
// ProgrammaticConfig, required to set the timeout, wasn't added until 4.0.1
29+
testCompile group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.1'
30+
testCompile group: 'org.cassandraunit', name: 'cassandra-unit', version: '4.3.1.0'
31+
32+
// Force 0.3.3 because 0.3.0 has a version parsing bug that fails on jdk 15
33+
testCompile group: 'com.github.jbellis', name: "jamm", version: '0.3.3'
34+
35+
testCompile project(':dd-java-agent:instrumentation:guava-10')
36+
37+
latestDepTestCompile group: 'com.datastax.oss', name: 'java-driver-core', version: '+'
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package datadog.trace.instrumentation.datastax.cassandra4;
2+
3+
import static java.util.Collections.singletonMap;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
6+
import static net.bytebuddy.matcher.ElementMatchers.named;
7+
import static net.bytebuddy.matcher.ElementMatchers.returns;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
9+
10+
import com.google.auto.service.AutoService;
11+
import datadog.trace.agent.tooling.Instrumenter;
12+
import java.util.Map;
13+
import net.bytebuddy.description.method.MethodDescription;
14+
import net.bytebuddy.description.type.TypeDescription;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
17+
@AutoService(Instrumenter.class)
18+
public class CassandraClientInstrumentation extends Instrumenter.Default {
19+
20+
public CassandraClientInstrumentation() {
21+
super("cassandra");
22+
}
23+
24+
@Override
25+
public ElementMatcher<TypeDescription> typeMatcher() {
26+
return named("com.datastax.oss.driver.internal.core.session.DefaultSession");
27+
}
28+
29+
@Override
30+
public String[] helperClassNames() {
31+
return new String[] {
32+
packageName + ".CassandraClientDecorator", packageName + ".TracingSession"
33+
};
34+
}
35+
36+
@Override
37+
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
38+
return singletonMap(
39+
isMethod()
40+
.and(named("init"))
41+
.and(isStatic())
42+
.and(takesArguments(3))
43+
.and(returns(named("java.util.concurrent.CompletionStage"))),
44+
packageName + ".CassandraClientAdvice");
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package datadog.trace.instrumentation.datastax.cassandra4;
2+
3+
import com.datastax.oss.driver.api.core.session.Session;
4+
import java.util.concurrent.CompletionStage;
5+
import net.bytebuddy.asm.Advice;
6+
7+
public class CassandraClientAdvice {
8+
9+
@Advice.OnMethodExit(suppress = Throwable.class)
10+
public static void injectTracingSession(
11+
@Advice.Return(readOnly = false) CompletionStage<Session> completionStage) {
12+
13+
// Change CompletingStage<Session> to CompletingStage<TracingSession>
14+
// The TracingSession wrapper includes span start/stop
15+
completionStage = completionStage.thenApply(TracingSession::new);
16+
}
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package datadog.trace.instrumentation.datastax.cassandra4;
2+
3+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
4+
import com.datastax.oss.driver.api.core.cql.ResultSet;
5+
import com.datastax.oss.driver.api.core.metadata.Node;
6+
import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
7+
import com.datastax.oss.driver.api.core.session.Session;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
9+
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
10+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
11+
import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator;
12+
import java.net.InetSocketAddress;
13+
import java.net.SocketAddress;
14+
import java.util.Objects;
15+
16+
public class CassandraClientDecorator extends DBTypeProcessingDatabaseClientDecorator<Session> {
17+
18+
public static final CharSequence CASSANDRA_EXECUTE = UTF8BytesString.create("cassandra.execute");
19+
public static final CharSequence JAVA_CASSANDRA = UTF8BytesString.create("java-cassandra");
20+
21+
public static final CassandraClientDecorator DECORATE = new CassandraClientDecorator();
22+
23+
@Override
24+
protected String[] instrumentationNames() {
25+
return new String[] {"cassandra"};
26+
}
27+
28+
@Override
29+
protected String service() {
30+
return "cassandra";
31+
}
32+
33+
@Override
34+
protected CharSequence component() {
35+
return JAVA_CASSANDRA;
36+
}
37+
38+
@Override
39+
protected CharSequence spanType() {
40+
return InternalSpanTypes.CASSANDRA;
41+
}
42+
43+
@Override
44+
protected String dbType() {
45+
return "cassandra";
46+
}
47+
48+
@Override
49+
protected String dbUser(final Session session) {
50+
return null;
51+
}
52+
53+
@Override
54+
protected String dbInstance(final Session session) {
55+
return session.getKeyspace().map(Objects::toString).orElse(null);
56+
}
57+
58+
@Override
59+
protected String dbHostname(Session session) {
60+
return null;
61+
}
62+
63+
public AgentSpan onResponse(final AgentSpan span, final ResultSet result) {
64+
if (result != null) {
65+
return onResponse(span, result.getExecutionInfo().getCoordinator());
66+
}
67+
68+
return span;
69+
}
70+
71+
public AgentSpan onResponse(final AgentSpan span, final AsyncResultSet result) {
72+
if (result != null) {
73+
return onResponse(span, result.getExecutionInfo().getCoordinator());
74+
}
75+
76+
return span;
77+
}
78+
79+
@Override
80+
public AgentSpan onError(final AgentSpan span, final Throwable throwable) {
81+
super.onError(span, throwable);
82+
83+
if (throwable instanceof CoordinatorException) {
84+
onResponse(span, ((CoordinatorException) throwable).getCoordinator());
85+
}
86+
87+
return span;
88+
}
89+
90+
private AgentSpan onResponse(AgentSpan span, Node coordinator) {
91+
if (coordinator != null) {
92+
SocketAddress address = coordinator.getEndPoint().resolve();
93+
if (address instanceof InetSocketAddress) {
94+
onPeerConnection(span, (InetSocketAddress) address);
95+
}
96+
}
97+
98+
return span;
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package datadog.trace.instrumentation.datastax.cassandra4;
2+
3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
5+
import static datadog.trace.instrumentation.datastax.cassandra4.CassandraClientDecorator.CASSANDRA_EXECUTE;
6+
import static datadog.trace.instrumentation.datastax.cassandra4.CassandraClientDecorator.DECORATE;
7+
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_CASSANDRA_ASYNC_SESSION;
8+
9+
import com.datastax.oss.driver.api.core.CqlSession;
10+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
11+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
12+
import com.datastax.oss.driver.api.core.cql.ResultSet;
13+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
14+
import com.datastax.oss.driver.api.core.cql.Statement;
15+
import com.datastax.oss.driver.api.core.session.Request;
16+
import com.datastax.oss.driver.api.core.session.Session;
17+
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
18+
import com.datastax.oss.driver.internal.core.session.SessionWrapper;
19+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
20+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
21+
import datadog.trace.util.AgentThreadFactory;
22+
import edu.umd.cs.findbugs.annotations.NonNull;
23+
import edu.umd.cs.findbugs.annotations.Nullable;
24+
import java.util.concurrent.CompletionException;
25+
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
29+
public class TracingSession extends SessionWrapper implements CqlSession {
30+
private static final ExecutorService EXECUTOR_SERVICE =
31+
Executors.newCachedThreadPool(new AgentThreadFactory(TRACE_CASSANDRA_ASYNC_SESSION));
32+
33+
public TracingSession(final Session session) {
34+
super(session);
35+
}
36+
37+
@Override
38+
@Nullable
39+
public <RequestT extends Request, ResultT> ResultT execute(
40+
@NonNull RequestT request, @NonNull GenericType<ResultT> resultType) {
41+
42+
if (request instanceof Statement && resultType.equals(Statement.SYNC)) {
43+
return (ResultT) wrapSyncRequest((Statement) request);
44+
} else if (request instanceof Statement && resultType.equals(Statement.ASYNC)) {
45+
return (ResultT) wrapAsyncRequest((Statement) request);
46+
} else {
47+
// PrepareRequest or unknown request: just forward to delegate
48+
return getDelegate().execute(request, resultType);
49+
}
50+
}
51+
52+
private ResultSet wrapSyncRequest(Statement request) {
53+
AgentSpan span = startSpan(CASSANDRA_EXECUTE);
54+
55+
DECORATE.afterStart(span);
56+
DECORATE.onConnection(span, getDelegate());
57+
DECORATE.onStatement(span, getQuery(request));
58+
59+
try (AgentScope scope = activateSpan(span)) {
60+
ResultSet resultSet = getDelegate().execute(request, Statement.SYNC);
61+
DECORATE.onResponse(span, resultSet);
62+
DECORATE.beforeFinish(span);
63+
64+
return resultSet;
65+
} catch (Exception e) {
66+
DECORATE.onError(span, e);
67+
DECORATE.beforeFinish(span);
68+
69+
throw e;
70+
} finally {
71+
span.finish();
72+
}
73+
}
74+
75+
private CompletionStage<AsyncResultSet> wrapAsyncRequest(Statement request) {
76+
AgentSpan span = startSpan(CASSANDRA_EXECUTE);
77+
78+
DECORATE.afterStart(span);
79+
DECORATE.onConnection(span, getDelegate());
80+
DECORATE.onStatement(span, getQuery(request));
81+
82+
try (AgentScope scope = activateSpan(span)) {
83+
CompletionStage<AsyncResultSet> completionStage =
84+
getDelegate().execute(request, Statement.ASYNC);
85+
86+
return completionStage.whenComplete(
87+
(result, throwable) -> {
88+
if (result != null) {
89+
DECORATE.onResponse(span, result);
90+
}
91+
92+
if (throwable instanceof CompletionException) {
93+
throwable = throwable.getCause();
94+
}
95+
DECORATE.onError(span, throwable);
96+
span.finish();
97+
});
98+
}
99+
}
100+
101+
private static String getQuery(final Statement statement) {
102+
String query = null;
103+
if (statement instanceof BoundStatement) {
104+
query = ((BoundStatement) statement).getPreparedStatement().getQuery();
105+
} else if (statement instanceof SimpleStatement) {
106+
query = ((SimpleStatement) statement).getQuery();
107+
}
108+
109+
return query == null ? "" : query;
110+
}
111+
}

0 commit comments

Comments
 (0)