Skip to content

Commit

Permalink
Instrumenting cassandra executeReactive method (#6441)
Browse files Browse the repository at this point in the history
It follows the
[issue](#6395 (comment))
I opened some days ago.

The `executeReactive` method use the same processor used by
`executeAsync` (see
[here](https://github.com/datastax/java-driver/blob/65d2c19c401175dcc6c370560dd5f783d05b05b9/core/src/main/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessor.java#L53))
and wrap the callback in the `DefaultReactiveResultSet` publisher.

Here I'm simply overriding the `executeReactive` method doing the same
thing: call the already instrumented `executeAsync` method and wrapping
the callback using the `DefaultReactiveResultSet` publisher.

~~I did an upgrade of the `java-driver-core` library to have
`TracingCqlSession.java` extending the `ReactiveSession`. I have to
probably rename the `cassandra-4.0` module in `cassandra-4.14` but I'll
let you confirm this.~~ -> Cassandra-4.4 is enough.

---------

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
SimoneGiusso and trask authored Mar 8, 2023
1 parent 583e2a7 commit 1a7e0f3
Show file tree
Hide file tree
Showing 25 changed files with 1,095 additions and 77 deletions.
2 changes: 1 addition & 1 deletion docs/supported-libraries.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ These are the supported libraries and frameworks:
| [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/java-handler.html) | 1.0+ | [opentelemetry-aws-lambda-core-1.0](../instrumentation/aws-lambda/aws-lambda-core-1.0/library),<br>[opentelemetry-aws-lambda-events-2.2](../instrumentation/aws-lambda/aws-lambda-events-2.2/library) | [FaaS Server Spans] |
| [AWS SDK](https://aws.amazon.com/sdk-for-java/) | 1.11.x and 2.2.0+ | [opentelemetry-aws-sdk-1.11](../instrumentation/aws-sdk/aws-sdk-1.11/library),<br>[opentelemetry-aws-sdk-1.11-autoconfigure](../instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure),<br>[opentelemetry-aws-sdk-2.2](../instrumentation/aws-sdk/aws-sdk-2.2/library),<br>[opentelemetry-aws-sdk-2.2-autoconfigure](../instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure) | [Messaging Spans], [Database Client Spans], [HTTP Client Spans] |
| [Azure Core](https://docs.microsoft.com/en-us/java/api/overview/azure/core-readme) | 1.14+ | N/A | Context propagation |
| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | N/A | [Database Client Spans] |
| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | [opentelemetry-cassandra-4.4](../instrumentation/cassandra/cassandra-4.4/library) | [Database Client Spans] |
| [Couchbase Client](https://github.com/couchbase/couchbase-java-client) | 2.0+ and 3.1+ | N/A | [Database Client Spans] |
| [c3p0](https://github.com/swaldman/c3p0) | 0.9.2+ | [opentelemetry-c3p0-0.9](../instrumentation/c3p0-0.9/library) | [Database Pool Metrics] |
| [Dropwizard Metrics](https://metrics.dropwizard.io/) | 4.0+ (disabled by default) | N/A | none |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies {
testInstrumentation(project(":instrumentation:guava-10.0:javaagent"))

latestDepTestLibrary("com.datastax.cassandra:cassandra-driver-core:3.+") // see cassandra-4.0 module

testInstrumentation(project(":instrumentation:cassandra:cassandra-4.0:javaagent"))
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent"))
}

// Requires old Guava. Can't use enforcedPlatform since predates BOM
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
id("otel.java-conventions")
}

dependencies {
api(project(":testing-common"))

implementation("org.testcontainers:testcontainers:1.17.5")
implementation("com.datastax.oss:java-driver-core:4.0.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.cassandra.v4.common;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL;
Expand All @@ -26,14 +28,12 @@
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -42,17 +42,20 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;

public class CassandraClientTest {

private static final Logger logger = LoggerFactory.getLogger(CassandraClientTest.class);
public abstract class AbstractCassandraTest {

@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraTest.class);

@SuppressWarnings("rawtypes")
private static GenericContainer cassandra;

private static int cassandraPort;
protected static int cassandraPort;

protected abstract InstrumentationExtension testing();

protected CqlSession wrap(CqlSession session) {
return session;
}

@BeforeAll
static void beforeAll() {
Expand All @@ -79,30 +82,33 @@ void syncTest(Parameter parameter) {

session.execute(parameter.statement);

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table))));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID,
val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE,
val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table))));

session.close();
}
Expand All @@ -112,42 +118,48 @@ void syncTest(Parameter parameter) {
void asyncTest(Parameter parameter) throws Exception {
CqlSession session = getSession(parameter.keyspace);

testing.runWithSpan(
"parent",
() ->
session
.executeAsync(parameter.statement)
.toCompletableFuture()
.whenComplete((result, throwable) -> testing.runWithSpan("child", () -> {}))
.get());
testing()
.runWithSpan(
"parent",
() ->
session
.executeAsync(parameter.statement)
.toCompletableFuture()
.whenComplete((result, throwable) -> testing().runWithSpan("child", () -> {}))
.get());

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table)),
span ->
span.hasName("child").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0))));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(NET_SOCK_PEER_NAME, "localhost"),
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
satisfies(
DB_CASSANDRA_COORDINATOR_ID,
val -> val.isInstanceOf(String.class)),
satisfies(
DB_CASSANDRA_IDEMPOTENCE,
val -> val.isInstanceOf(Boolean.class)),
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
equalTo(DB_CASSANDRA_TABLE, parameter.table)),
span ->
span.hasName("child")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));

session.close();
}
Expand Down Expand Up @@ -260,7 +272,7 @@ private static Stream<Arguments> provideAsyncParameters() {
"users"))));
}

private static class Parameter {
protected static class Parameter {
public final String keyspace;
public final String statement;
public final String expectedStatement;
Expand All @@ -284,16 +296,17 @@ public Parameter(
}
}

CqlSession getSession(String keyspace) {
protected CqlSession getSession(String keyspace) {
DriverConfigLoader configLoader =
DefaultDriverConfigLoader.builder()
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0))
.build();
return CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace(keyspace)
.build();
return wrap(
CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace(keyspace)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ muzzle {
pass {
group.set("com.datastax.oss")
module.set("java-driver-core")
versions.set("[4.0,)")
versions.set("[4.0,4.4)")
assertInverse.set(true)
}
}
Expand All @@ -16,6 +16,11 @@ dependencies {

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

testImplementation(project(":instrumentation:cassandra:cassandra-4-common:testing"))

testInstrumentation(project(":instrumentation:cassandra:cassandra-3.0:javaagent"))
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent"))
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static java.util.Collections.singletonList;
import static net.bytebuddy.matcher.ElementMatchers.not;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(InstrumentationModule.class)
public class CassandraClientInstrumentationModule extends InstrumentationModule {

public CassandraClientInstrumentationModule() {
super("cassandra", "cassandra-4.0");
}
Expand All @@ -22,4 +26,10 @@ public CassandraClientInstrumentationModule() {
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new SessionBuilderInstrumentation());
}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// new public interface introduced in version 4.4
return not(hasClassesNamed("com.datastax.dse.driver.api.core.cql.reactive.ReactiveSession"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;

public class CassandraTest extends AbstractCassandraTest {

@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@Override
protected InstrumentationExtension testing() {
return testing;
}
}
Loading

0 comments on commit 1a7e0f3

Please sign in to comment.