Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}

muzzle {
pass {
group = "com.datastax.oss"
module = "java-driver-core"
versions = "[4.0,]"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

apply plugin: 'org.unbroken-dome.test-sets'

testSets {
latestDepTest {
dirName = 'test'
}
}

dependencies {
compileOnly group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.0'
main_java8CompileOnly group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.0'

// ProgrammaticConfig, required to set the timeout, wasn't added until 4.0.1
testCompile group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.1'
testCompile group: 'org.cassandraunit', name: 'cassandra-unit', version: '4.3.1.0'

// Force 0.3.3 because 0.3.0 has a version parsing bug that fails on jdk 15
testCompile group: 'com.github.jbellis', name: "jamm", version: '0.3.3'

testCompile project(':dd-java-agent:instrumentation:guava-10')

latestDepTestCompile group: 'com.datastax.oss', name: 'java-driver-core', version: '+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package datadog.trace.instrumentation.datastax.cassandra4;

import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(Instrumenter.class)
public class CassandraClientInstrumentation extends Instrumenter.Default {

public CassandraClientInstrumentation() {
super("cassandra");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.datastax.oss.driver.internal.core.session.DefaultSession");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".CassandraClientDecorator", packageName + ".TracingSession"
};
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(named("init"))
.and(isStatic())
.and(takesArguments(3))
.and(returns(named("java.util.concurrent.CompletionStage"))),
packageName + ".CassandraClientAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package datadog.trace.instrumentation.datastax.cassandra4;

import com.datastax.oss.driver.api.core.session.Session;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;

public class CassandraClientAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void injectTracingSession(
@Advice.Return(readOnly = false) CompletionStage<Session> completionStage) {

// Change CompletingStage<Session> to CompletingStage<TracingSession>
// The TracingSession wrapper includes span start/stop
completionStage = completionStage.thenApply(TracingSession::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package datadog.trace.instrumentation.datastax.cassandra4;

import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
import com.datastax.oss.driver.api.core.session.Session;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;

public class CassandraClientDecorator extends DBTypeProcessingDatabaseClientDecorator<Session> {

public static final CharSequence CASSANDRA_EXECUTE = UTF8BytesString.create("cassandra.execute");
public static final CharSequence JAVA_CASSANDRA = UTF8BytesString.create("java-cassandra");

public static final CassandraClientDecorator DECORATE = new CassandraClientDecorator();

@Override
protected String[] instrumentationNames() {
return new String[] {"cassandra"};
}

@Override
protected String service() {
return "cassandra";
}

@Override
protected CharSequence component() {
return JAVA_CASSANDRA;
}

@Override
protected CharSequence spanType() {
return InternalSpanTypes.CASSANDRA;
}

@Override
protected String dbType() {
return "cassandra";
}

@Override
protected String dbUser(final Session session) {
return null;
}

@Override
protected String dbInstance(final Session session) {
return session.getKeyspace().map(Objects::toString).orElse(null);
}

@Override
protected String dbHostname(Session session) {
return null;
}

public AgentSpan onResponse(final AgentSpan span, final ResultSet result) {
if (result != null) {
return onResponse(span, result.getExecutionInfo().getCoordinator());
}

return span;
}

public AgentSpan onResponse(final AgentSpan span, final AsyncResultSet result) {
if (result != null) {
return onResponse(span, result.getExecutionInfo().getCoordinator());
}

return span;
}

@Override
public AgentSpan onError(final AgentSpan span, final Throwable throwable) {
super.onError(span, throwable);

if (throwable instanceof CoordinatorException) {
onResponse(span, ((CoordinatorException) throwable).getCoordinator());
}

return span;
}

private AgentSpan onResponse(AgentSpan span, Node coordinator) {
if (coordinator != null) {
SocketAddress address = coordinator.getEndPoint().resolve();
if (address instanceof InetSocketAddress) {
onPeerConnection(span, (InetSocketAddress) address);
}
}

return span;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package datadog.trace.instrumentation.datastax.cassandra4;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.datastax.cassandra4.CassandraClientDecorator.CASSANDRA_EXECUTE;
import static datadog.trace.instrumentation.datastax.cassandra4.CassandraClientDecorator.DECORATE;
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_CASSANDRA_ASYNC_SESSION;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.datastax.oss.driver.internal.core.session.SessionWrapper;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.util.AgentThreadFactory;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TracingSession extends SessionWrapper implements CqlSession {
private static final ExecutorService EXECUTOR_SERVICE =
Executors.newCachedThreadPool(new AgentThreadFactory(TRACE_CASSANDRA_ASYNC_SESSION));

public TracingSession(final Session session) {
super(session);
}

@Override
@Nullable
public <RequestT extends Request, ResultT> ResultT execute(
@NonNull RequestT request, @NonNull GenericType<ResultT> resultType) {

if (request instanceof Statement && resultType.equals(Statement.SYNC)) {
return (ResultT) wrapSyncRequest((Statement) request);
} else if (request instanceof Statement && resultType.equals(Statement.ASYNC)) {
return (ResultT) wrapAsyncRequest((Statement) request);
} else {
// PrepareRequest or unknown request: just forward to delegate
return getDelegate().execute(request, resultType);
}
}

private ResultSet wrapSyncRequest(Statement request) {
AgentSpan span = startSpan(CASSANDRA_EXECUTE);

DECORATE.afterStart(span);
DECORATE.onConnection(span, getDelegate());
DECORATE.onStatement(span, getQuery(request));

try (AgentScope scope = activateSpan(span)) {
ResultSet resultSet = getDelegate().execute(request, Statement.SYNC);
DECORATE.onResponse(span, resultSet);
DECORATE.beforeFinish(span);

return resultSet;
} catch (Exception e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);

throw e;
} finally {
span.finish();
}
}

private CompletionStage<AsyncResultSet> wrapAsyncRequest(Statement request) {
AgentSpan span = startSpan(CASSANDRA_EXECUTE);

DECORATE.afterStart(span);
DECORATE.onConnection(span, getDelegate());
DECORATE.onStatement(span, getQuery(request));

try (AgentScope scope = activateSpan(span)) {
CompletionStage<AsyncResultSet> completionStage =
getDelegate().execute(request, Statement.ASYNC);

return completionStage.whenComplete(
(result, throwable) -> {
if (result != null) {
DECORATE.onResponse(span, result);
}

if (throwable instanceof CompletionException) {
throwable = throwable.getCause();
}
DECORATE.onError(span, throwable);
span.finish();
});
}
}

private static String getQuery(final Statement statement) {
String query = null;
if (statement instanceof BoundStatement) {
query = ((BoundStatement) statement).getPreparedStatement().getQuery();
} else if (statement instanceof SimpleStatement) {
query = ((SimpleStatement) statement).getQuery();
}

return query == null ? "" : query;
}
}
Loading