From 1a7e0f32351fd323293f16363ba2dd0bff504846 Mon Sep 17 00:00:00 2001 From: Simone Giusso Date: Wed, 8 Mar 2023 16:55:00 +0100 Subject: [PATCH] Instrumenting cassandra executeReactive method (#6441) It follows the [issue](https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/6395#issue-1323561263) I opened some days ago. The `executeReactive` method use the same processor used by `executeAsync` (see [here](https://github.com/datastax/java-driver/blob/65d2c19c401175dcc6c370560dd5f783d05b05b9/core/src/main/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessor.java#L53)) and wrap the callback in the `DefaultReactiveResultSet` publisher. Here I'm simply overriding the `executeReactive` method doing the same thing: call the already instrumented `executeAsync` method and wrapping the callback using the `DefaultReactiveResultSet` publisher. ~~I did an upgrade of the `java-driver-core` library to have `TracingCqlSession.java` extending the `ReactiveSession`. I have to probably rename the `cassandra-4.0` module in `cassandra-4.14` but I'll let you confirm this.~~ -> Cassandra-4.4 is enough. --------- Co-authored-by: Trask Stalnaker --- docs/supported-libraries.md | 2 +- .../cassandra-3.0/javaagent/build.gradle.kts | 3 + .../testing/build.gradle.kts | 10 + .../v4/common/AbstractCassandraTest.java} | 163 ++++++----- .../cassandra-4.0/javaagent/build.gradle.kts | 7 +- .../CassandraClientInstrumentationModule.java | 10 + .../src/test/java/CassandraTest.java | 20 ++ .../cassandra-4.4/javaagent/build.gradle.kts | 33 +++ .../CassandraClientInstrumentationModule.java | 34 +++ .../cassandra/v4_4/CassandraSingletons.java | 20 ++ .../v4_4/CompletionStageFunction.java | 24 ++ .../v4_4/SessionBuilderInstrumentation.java | 53 ++++ .../cassandra/v4_4/CassandraTest.java | 22 ++ .../cassandra-4.4/library/build.gradle.kts | 12 + .../v4_4/CassandraAttributesExtractor.java | 77 +++++ .../v4_4/CassandraNetAttributesGetter.java | 52 ++++ .../cassandra/v4_4/CassandraRequest.java | 21 ++ .../v4_4/CassandraSqlAttributesGetter.java | 43 +++ .../cassandra/v4_4/CassandraTelemetry.java | 44 +++ .../v4_4/CassandraTelemetryBuilder.java | 66 +++++ .../cassandra/v4_4/TracingCqlSession.java | 273 ++++++++++++++++++ .../cassandra/v4_4/CassandraTest.java | 28 ++ .../cassandra-4.4/testing/build.gradle.kts | 12 + .../v4_4/AbstractCassandra44Test.java | 139 +++++++++ settings.gradle.kts | 4 + 25 files changed, 1095 insertions(+), 77 deletions(-) create mode 100644 instrumentation/cassandra/cassandra-4-common/testing/build.gradle.kts rename instrumentation/cassandra/{cassandra-4.0/javaagent/src/test/java/CassandraClientTest.java => cassandra-4-common/testing/src/main/java/io/opentelemetry/cassandra/v4/common/AbstractCassandraTest.java} (64%) create mode 100644 instrumentation/cassandra/cassandra-4.0/javaagent/src/test/java/CassandraTest.java create mode 100644 instrumentation/cassandra/cassandra-4.4/javaagent/build.gradle.kts create mode 100644 instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraClientInstrumentationModule.java create mode 100644 instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraSingletons.java create mode 100644 instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CompletionStageFunction.java create mode 100644 instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/SessionBuilderInstrumentation.java create mode 100644 instrumentation/cassandra/cassandra-4.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraTest.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/build.gradle.kts create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraNetAttributesGetter.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetry.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetryBuilder.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java create mode 100644 instrumentation/cassandra/cassandra-4.4/library/src/test/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTest.java create mode 100644 instrumentation/cassandra/cassandra-4.4/testing/build.gradle.kts create mode 100644 instrumentation/cassandra/cassandra-4.4/testing/src/main/java/io/opentelemetry/testing/cassandra/v4_4/AbstractCassandra44Test.java diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index 781306aaa6c5..17ab03d5262e 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -42,7 +42,7 @@ These are the supported libraries and frameworks: | [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/java-handler.html) | 1.0+ | [opentelemetry-aws-lambda-core-1.0](../instrumentation/aws-lambda/aws-lambda-core-1.0/library),
[opentelemetry-aws-lambda-events-2.2](../instrumentation/aws-lambda/aws-lambda-events-2.2/library) | [FaaS Server Spans] | | [AWS SDK](https://aws.amazon.com/sdk-for-java/) | 1.11.x and 2.2.0+ | [opentelemetry-aws-sdk-1.11](../instrumentation/aws-sdk/aws-sdk-1.11/library),
[opentelemetry-aws-sdk-1.11-autoconfigure](../instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure),
[opentelemetry-aws-sdk-2.2](../instrumentation/aws-sdk/aws-sdk-2.2/library),
[opentelemetry-aws-sdk-2.2-autoconfigure](../instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure) | [Messaging Spans], [Database Client Spans], [HTTP Client Spans] | | [Azure Core](https://docs.microsoft.com/en-us/java/api/overview/azure/core-readme) | 1.14+ | N/A | Context propagation | -| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | N/A | [Database Client Spans] | +| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | [opentelemetry-cassandra-4.4](../instrumentation/cassandra/cassandra-4.4/library) | [Database Client Spans] | | [Couchbase Client](https://github.com/couchbase/couchbase-java-client) | 2.0+ and 3.1+ | N/A | [Database Client Spans] | | [c3p0](https://github.com/swaldman/c3p0) | 0.9.2+ | [opentelemetry-c3p0-0.9](../instrumentation/c3p0-0.9/library) | [Database Pool Metrics] | | [Dropwizard Metrics](https://metrics.dropwizard.io/) | 4.0+ (disabled by default) | N/A | none | diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/build.gradle.kts b/instrumentation/cassandra/cassandra-3.0/javaagent/build.gradle.kts index 3bbf82fe3370..83514dcd1f25 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/build.gradle.kts +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/build.gradle.kts @@ -35,6 +35,9 @@ dependencies { testInstrumentation(project(":instrumentation:guava-10.0:javaagent")) latestDepTestLibrary("com.datastax.cassandra:cassandra-driver-core:3.+") // see cassandra-4.0 module + + testInstrumentation(project(":instrumentation:cassandra:cassandra-4.0:javaagent")) + testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent")) } // Requires old Guava. Can't use enforcedPlatform since predates BOM diff --git a/instrumentation/cassandra/cassandra-4-common/testing/build.gradle.kts b/instrumentation/cassandra/cassandra-4-common/testing/build.gradle.kts new file mode 100644 index 000000000000..3d12c28f94e9 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4-common/testing/build.gradle.kts @@ -0,0 +1,10 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + api(project(":testing-common")) + + implementation("org.testcontainers:testcontainers:1.17.5") + implementation("com.datastax.oss:java-driver-core:4.0.0") +} diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/test/java/CassandraClientTest.java b/instrumentation/cassandra/cassandra-4-common/testing/src/main/java/io/opentelemetry/cassandra/v4/common/AbstractCassandraTest.java similarity index 64% rename from instrumentation/cassandra/cassandra-4.0/javaagent/src/test/java/CassandraClientTest.java rename to instrumentation/cassandra/cassandra-4-common/testing/src/main/java/io/opentelemetry/cassandra/v4/common/AbstractCassandraTest.java index 3ffd1b1e21e9..9b3b06a5770f 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/test/java/CassandraClientTest.java +++ b/instrumentation/cassandra/cassandra-4-common/testing/src/main/java/io/opentelemetry/cassandra/v4/common/AbstractCassandraTest.java @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ +package io.opentelemetry.cassandra.v4.common; + import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL; @@ -26,14 +28,12 @@ import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader; import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import java.net.InetSocketAddress; import java.time.Duration; import java.util.stream.Stream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -42,17 +42,20 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; -public class CassandraClientTest { - - private static final Logger logger = LoggerFactory.getLogger(CassandraClientTest.class); +public abstract class AbstractCassandraTest { - @RegisterExtension - static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraTest.class); @SuppressWarnings("rawtypes") private static GenericContainer cassandra; - private static int cassandraPort; + protected static int cassandraPort; + + protected abstract InstrumentationExtension testing(); + + protected CqlSession wrap(CqlSession session) { + return session; + } @BeforeAll static void beforeAll() { @@ -79,30 +82,33 @@ void syncTest(Parameter parameter) { session.execute(parameter.statement); - testing.waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName(parameter.spanName) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasAttributesSatisfyingExactly( - equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"), - equalTo(NET_SOCK_PEER_NAME, "localhost"), - equalTo(NET_SOCK_PEER_PORT, cassandraPort), - equalTo(DB_SYSTEM, "cassandra"), - equalTo(DB_NAME, parameter.keyspace), - equalTo(DB_STATEMENT, parameter.expectedStatement), - equalTo(DB_OPERATION, parameter.operation), - equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"), - equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"), - satisfies( - DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)), - satisfies( - DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)), - equalTo(DB_CASSANDRA_PAGE_SIZE, 5000), - equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0), - equalTo(DB_CASSANDRA_TABLE, parameter.table)))); + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(parameter.spanName) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"), + equalTo(NET_SOCK_PEER_NAME, "localhost"), + equalTo(NET_SOCK_PEER_PORT, cassandraPort), + equalTo(DB_SYSTEM, "cassandra"), + equalTo(DB_NAME, parameter.keyspace), + equalTo(DB_STATEMENT, parameter.expectedStatement), + equalTo(DB_OPERATION, parameter.operation), + equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"), + equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"), + satisfies( + DB_CASSANDRA_COORDINATOR_ID, + val -> val.isInstanceOf(String.class)), + satisfies( + DB_CASSANDRA_IDEMPOTENCE, + val -> val.isInstanceOf(Boolean.class)), + equalTo(DB_CASSANDRA_PAGE_SIZE, 5000), + equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0), + equalTo(DB_CASSANDRA_TABLE, parameter.table)))); session.close(); } @@ -112,42 +118,48 @@ void syncTest(Parameter parameter) { void asyncTest(Parameter parameter) throws Exception { CqlSession session = getSession(parameter.keyspace); - testing.runWithSpan( - "parent", - () -> - session - .executeAsync(parameter.statement) - .toCompletableFuture() - .whenComplete((result, throwable) -> testing.runWithSpan("child", () -> {})) - .get()); + testing() + .runWithSpan( + "parent", + () -> + session + .executeAsync(parameter.statement) + .toCompletableFuture() + .whenComplete((result, throwable) -> testing().runWithSpan("child", () -> {})) + .get()); - testing.waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName(parameter.spanName) - .hasKind(SpanKind.CLIENT) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"), - equalTo(NET_SOCK_PEER_NAME, "localhost"), - equalTo(NET_SOCK_PEER_PORT, cassandraPort), - equalTo(DB_SYSTEM, "cassandra"), - equalTo(DB_NAME, parameter.keyspace), - equalTo(DB_STATEMENT, parameter.expectedStatement), - equalTo(DB_OPERATION, parameter.operation), - equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"), - equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"), - satisfies( - DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)), - satisfies( - DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)), - equalTo(DB_CASSANDRA_PAGE_SIZE, 5000), - equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0), - equalTo(DB_CASSANDRA_TABLE, parameter.table)), - span -> - span.hasName("child").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)))); + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(parameter.spanName) + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"), + equalTo(NET_SOCK_PEER_NAME, "localhost"), + equalTo(NET_SOCK_PEER_PORT, cassandraPort), + equalTo(DB_SYSTEM, "cassandra"), + equalTo(DB_NAME, parameter.keyspace), + equalTo(DB_STATEMENT, parameter.expectedStatement), + equalTo(DB_OPERATION, parameter.operation), + equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"), + equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"), + satisfies( + DB_CASSANDRA_COORDINATOR_ID, + val -> val.isInstanceOf(String.class)), + satisfies( + DB_CASSANDRA_IDEMPOTENCE, + val -> val.isInstanceOf(Boolean.class)), + equalTo(DB_CASSANDRA_PAGE_SIZE, 5000), + equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0), + equalTo(DB_CASSANDRA_TABLE, parameter.table)), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); session.close(); } @@ -260,7 +272,7 @@ private static Stream provideAsyncParameters() { "users")))); } - private static class Parameter { + protected static class Parameter { public final String keyspace; public final String statement; public final String expectedStatement; @@ -284,16 +296,17 @@ public Parameter( } } - CqlSession getSession(String keyspace) { + protected CqlSession getSession(String keyspace) { DriverConfigLoader configLoader = DefaultDriverConfigLoader.builder() .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0)) .build(); - return CqlSession.builder() - .addContactPoint(new InetSocketAddress("localhost", cassandraPort)) - .withConfigLoader(configLoader) - .withLocalDatacenter("datacenter1") - .withKeyspace(keyspace) - .build(); + return wrap( + CqlSession.builder() + .addContactPoint(new InetSocketAddress("localhost", cassandraPort)) + .withConfigLoader(configLoader) + .withLocalDatacenter("datacenter1") + .withKeyspace(keyspace) + .build()); } } diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/build.gradle.kts b/instrumentation/cassandra/cassandra-4.0/javaagent/build.gradle.kts index aad53b864a45..b3f073ab9c4b 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/build.gradle.kts +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/build.gradle.kts @@ -6,7 +6,7 @@ muzzle { pass { group.set("com.datastax.oss") module.set("java-driver-core") - versions.set("[4.0,)") + versions.set("[4.0,4.4)") assertInverse.set(true) } } @@ -16,6 +16,11 @@ dependencies { compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") + + testImplementation(project(":instrumentation:cassandra:cassandra-4-common:testing")) + + testInstrumentation(project(":instrumentation:cassandra:cassandra-3.0:javaagent")) + testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent")) } tasks { diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraClientInstrumentationModule.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraClientInstrumentationModule.java index fad308fe813a..1d450da1b951 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraClientInstrumentationModule.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraClientInstrumentationModule.java @@ -5,15 +5,19 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; import static java.util.Collections.singletonList; +import static net.bytebuddy.matcher.ElementMatchers.not; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import java.util.List; +import net.bytebuddy.matcher.ElementMatcher; @AutoService(InstrumentationModule.class) public class CassandraClientInstrumentationModule extends InstrumentationModule { + public CassandraClientInstrumentationModule() { super("cassandra", "cassandra-4.0"); } @@ -22,4 +26,10 @@ public CassandraClientInstrumentationModule() { public List typeInstrumentations() { return singletonList(new SessionBuilderInstrumentation()); } + + @Override + public ElementMatcher.Junction classLoaderMatcher() { + // new public interface introduced in version 4.4 + return not(hasClassesNamed("com.datastax.dse.driver.api.core.cql.reactive.ReactiveSession")); + } } diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/test/java/CassandraTest.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/test/java/CassandraTest.java new file mode 100644 index 000000000000..4db6a86efbdb --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/test/java/CassandraTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class CassandraTest extends AbstractCassandraTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/javaagent/build.gradle.kts b/instrumentation/cassandra/cassandra-4.4/javaagent/build.gradle.kts new file mode 100644 index 000000000000..f2e744e7c677 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/javaagent/build.gradle.kts @@ -0,0 +1,33 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("com.datastax.oss") + module.set("java-driver-core") + versions.set("[4.4,]") + assertInverse.set(true) + } +} + +dependencies { + implementation(project(":instrumentation:cassandra:cassandra-4.4:library")) + + library("com.datastax.oss:java-driver-core:4.4.0") + + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + + testImplementation("io.projectreactor:reactor-core:3.4.21") + testImplementation(project(":instrumentation:cassandra:cassandra-4.4:testing")) + + testInstrumentation(project(":instrumentation:cassandra:cassandra-3.0:javaagent")) + testInstrumentation(project(":instrumentation:cassandra:cassandra-4.0:javaagent")) +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService()) + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraClientInstrumentationModule.java b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraClientInstrumentationModule.java new file mode 100644 index 000000000000..7736488a447d --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraClientInstrumentationModule.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static java.util.Collections.singletonList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(InstrumentationModule.class) +public class CassandraClientInstrumentationModule extends InstrumentationModule { + + public CassandraClientInstrumentationModule() { + super("cassandra", "cassandra-4.4"); + } + + @Override + public List typeInstrumentations() { + return singletonList(new SessionBuilderInstrumentation()); + } + + @Override + public ElementMatcher.Junction classLoaderMatcher() { + // new public interface introduced in version 4.4 + return hasClassesNamed("com.datastax.dse.driver.api.core.cql.reactive.ReactiveSession"); + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraSingletons.java b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraSingletons.java new file mode 100644 index 000000000000..7803fbd26174 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraSingletons.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.cassandra.v4_4.CassandraTelemetry; +import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig; + +final class CassandraSingletons { + + static final CassandraTelemetry telemetry = + CassandraTelemetry.builder(GlobalOpenTelemetry.get()) + .setStatementSanitizationEnabled(CommonConfig.get().isStatementSanitizationEnabled()) + .build(); + + private CassandraSingletons() {} +} diff --git a/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CompletionStageFunction.java b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CompletionStageFunction.java new file mode 100644 index 000000000000..51fdd3d8749a --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CompletionStageFunction.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.CqlSession; +import java.util.function.Function; + +public class CompletionStageFunction implements Function { + + @Override + public Object apply(Object session) { + if (session == null) { + return null; + } + // This should cover ours and OT's TracingCqlSession + if (session.getClass().getName().endsWith("cassandra4.TracingCqlSession")) { + return session; + } + return CassandraSingletons.telemetry.wrap((CqlSession) session); + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/SessionBuilderInstrumentation.java b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/SessionBuilderInstrumentation.java new file mode 100644 index 000000000000..913cc61123e4 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/SessionBuilderInstrumentation.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.concurrent.CompletionStage; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class SessionBuilderInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + // Note: Cassandra has a large driver and we instrument single class in it. + // The rest is ignored in AdditionalLibraryIgnoresMatcher + return named("com.datastax.oss.driver.api.core.session.SessionBuilder"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(isPublic()).and(named("buildAsync")).and(takesArguments(0)), + SessionBuilderInstrumentation.class.getName() + "$BuildAdvice"); + } + + @SuppressWarnings("unused") + public static class BuildAdvice { + + /** + * Strategy: each time we build a connection to a Cassandra cluster, the + * com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync() method is called. The + * opentracing contribution is a simple wrapper, so we just have to wrap the new session. + * + * @param stage The fresh CompletionStage to patch. This stage produces session which is + * replaced with new session + */ + @Advice.OnMethodExit(suppress = Throwable.class) + public static void injectTracingSession( + @Advice.Return(readOnly = false) CompletionStage stage) { + stage = stage.thenApply(new CompletionStageFunction()); + } + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraTest.java b/instrumentation/cassandra/cassandra-4.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraTest.java new file mode 100644 index 000000000000..bb281d630b02 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_4/CassandraTest.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.testing.cassandra.v4_4.AbstractCassandra44Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class CassandraTest extends AbstractCassandra44Test { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/build.gradle.kts b/instrumentation/cassandra/cassandra-4.4/library/build.gradle.kts new file mode 100644 index 000000000000..c0f0592742a9 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + library("com.datastax.oss:java-driver-core:4.4.0") + + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + + testImplementation(project(":instrumentation:cassandra:cassandra-4.4:testing")) +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java new file mode 100644 index 000000000000..08c93b9eb33f --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java @@ -0,0 +1,77 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.Node; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; + +final class CassandraAttributesExtractor + implements AttributesExtractor { + + @Override + public void onStart( + AttributesBuilder attributes, Context parentContext, CassandraRequest request) {} + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + CassandraRequest request, + @Nullable ExecutionInfo executionInfo, + @Nullable Throwable error) { + if (executionInfo == null) { + return; + } + + Node coordinator = executionInfo.getCoordinator(); + if (coordinator != null) { + if (coordinator.getDatacenter() != null) { + attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter()); + } + if (coordinator.getHostId() != null) { + attributes.put( + SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID, coordinator.getHostId().toString()); + } + } + attributes.put( + SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, + executionInfo.getSpeculativeExecutionCount()); + + Statement statement = (Statement) executionInfo.getRequest(); + String consistencyLevel; + DriverExecutionProfile config = + request.getSession().getContext().getConfig().getDefaultProfile(); + if (statement.getConsistencyLevel() != null) { + consistencyLevel = statement.getConsistencyLevel().name(); + } else { + consistencyLevel = config.getString(DefaultDriverOption.REQUEST_CONSISTENCY); + } + attributes.put(SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL, consistencyLevel); + + if (statement.getPageSize() > 0) { + attributes.put(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, statement.getPageSize()); + } else { + int pageSize = config.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE); + if (pageSize > 0) { + attributes.put(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, pageSize); + } + } + + Boolean idempotent = statement.isIdempotent(); + if (idempotent == null) { + idempotent = config.getBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE); + } + attributes.put(SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, idempotent); + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraNetAttributesGetter.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraNetAttributesGetter.java new file mode 100644 index 000000000000..27e64d5e302a --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraNetAttributesGetter.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.metadata.Node; +import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetClientAttributesGetter; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import javax.annotation.Nullable; + +final class CassandraNetAttributesGetter + extends InetSocketAddressNetClientAttributesGetter { + + @Override + @Nullable + public String getTransport(CassandraRequest request, @Nullable ExecutionInfo executionInfo) { + return null; + } + + @Nullable + @Override + public String getPeerName(CassandraRequest request) { + return null; + } + + @Nullable + @Override + public Integer getPeerPort(CassandraRequest request) { + return null; + } + + @Override + @Nullable + protected InetSocketAddress getPeerSocketAddress( + CassandraRequest request, @Nullable ExecutionInfo executionInfo) { + if (executionInfo == null) { + return null; + } + Node coordinator = executionInfo.getCoordinator(); + if (coordinator == null) { + return null; + } + // resolve() returns an existing InetSocketAddress, it does not do a dns resolve, + // at least in the only current EndPoint implementation (DefaultEndPoint) + SocketAddress address = coordinator.getEndPoint().resolve(); + return address instanceof InetSocketAddress ? (InetSocketAddress) address : null; + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java new file mode 100644 index 000000000000..021dfc7b6c6d --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.session.Session; +import com.google.auto.value.AutoValue; + +@AutoValue +public abstract class CassandraRequest { + + public static CassandraRequest create(Session session, String statement) { + return new AutoValue_CassandraRequest(session, statement); + } + + public abstract Session getSession(); + + public abstract String getStatement(); +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java new file mode 100644 index 000000000000..80ceadf8798d --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesGetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; + +final class CassandraSqlAttributesGetter implements SqlClientAttributesGetter { + + @Override + public String getSystem(CassandraRequest request) { + return SemanticAttributes.DbSystemValues.CASSANDRA; + } + + @Override + @Nullable + public String getUser(CassandraRequest request) { + return null; + } + + @Override + @Nullable + public String getName(CassandraRequest request) { + return request.getSession().getKeyspace().map(CqlIdentifier::toString).orElse(null); + } + + @Override + @Nullable + public String getConnectionString(CassandraRequest request) { + return null; + } + + @Override + @Nullable + public String getRawStatement(CassandraRequest request) { + return request.getStatement(); + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetry.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetry.java new file mode 100644 index 000000000000..cb2214e8fbd3 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetry.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; + +/** Entrypoint for instrumenting cassandra sessions. */ +public class CassandraTelemetry { + + /** Returns a new {@link CassandraTelemetry} configured with the given {@link OpenTelemetry}. */ + public static CassandraTelemetry create(OpenTelemetry openTelemetry) { + return builder(openTelemetry).build(); + } + + /** + * Returns a new {@link CassandraTelemetryBuilder} configured with the given {@link + * OpenTelemetry}. + */ + public static CassandraTelemetryBuilder builder(OpenTelemetry openTelemetry) { + return new CassandraTelemetryBuilder(openTelemetry); + } + + private final Instrumenter instrumenter; + + protected CassandraTelemetry(Instrumenter instrumenter) { + this.instrumenter = instrumenter; + } + + /** + * Construct a new tracing-enable CqlSession using the provided {@link CqlSession} instance. + * + * @param session An instance of CqlSession configured as desired. + * @return a {@link TracingCqlSession}. + */ + public CqlSession wrap(CqlSession session) { + return new TracingCqlSession(session, instrumenter); + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetryBuilder.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetryBuilder.java new file mode 100644 index 000000000000..bf8657a09550 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetryBuilder.java @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; + +/** A builder of {@link CassandraTelemetry}. */ +public class CassandraTelemetryBuilder { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.cassandra-4.4"; + + private final OpenTelemetry openTelemetry; + + private boolean statementSanitizationEnabled = true; + + protected CassandraTelemetryBuilder(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + /** + * Sets whether the {@code db.statement} attribute on the spans emitted by the constructed {@link + * CassandraTelemetry} should be sanitized. If set to {@code true}, all parameters that can + * potentially contain sensitive information will be masked. Enabled by default. + */ + @CanIgnoreReturnValue + public CassandraTelemetryBuilder setStatementSanitizationEnabled(boolean enabled) { + this.statementSanitizationEnabled = enabled; + return this; + } + + /** + * Returns a new {@link CassandraTelemetry} with the settings of this {@link + * CassandraTelemetryBuilder}. + */ + public CassandraTelemetry build() { + return new CassandraTelemetry(createInstrumenter(openTelemetry, statementSanitizationEnabled)); + } + + protected Instrumenter createInstrumenter( + OpenTelemetry openTelemetry, boolean statementSanitizationEnabled) { + CassandraSqlAttributesGetter attributesGetter = new CassandraSqlAttributesGetter(); + + return Instrumenter.builder( + openTelemetry, INSTRUMENTATION_NAME, DbClientSpanNameExtractor.create(attributesGetter)) + .addAttributesExtractor( + SqlClientAttributesExtractor.builder(attributesGetter) + .setTableAttribute(SemanticAttributes.DB_CASSANDRA_TABLE) + .setStatementSanitizationEnabled(statementSanitizationEnabled) + .build()) + .addAttributesExtractor( + NetClientAttributesExtractor.create(new CassandraNetAttributesGetter())) + .addAttributesExtractor(new CassandraAttributesExtractor()) + .buildInstrumenter(SpanKindExtractor.alwaysClient()); + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java new file mode 100644 index 000000000000..4a1c43c47d98 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java @@ -0,0 +1,273 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet; +import com.datastax.dse.driver.internal.core.cql.reactive.DefaultReactiveResultSet; +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DriverException; +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.cql.PrepareRequest; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metrics.Metrics; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.api.core.type.reflect.GenericType; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +public class TracingCqlSession implements CqlSession { + private final CqlSession session; + + private final Instrumenter instrumenter; + + public TracingCqlSession( + CqlSession session, Instrumenter instrumenter) { + this.session = session; + this.instrumenter = instrumenter; + } + + @Override + public PreparedStatement prepare(SimpleStatement statement) { + return session.prepare(statement); + } + + @Override + public PreparedStatement prepare(String query) { + return session.prepare(query); + } + + @Override + public PreparedStatement prepare(PrepareRequest request) { + return session.prepare(request); + } + + @Override + public CompletionStage prepareAsync(SimpleStatement statement) { + return session.prepareAsync(statement); + } + + @Override + public CompletionStage prepareAsync(String query) { + return session.prepareAsync(query); + } + + @Override + public CompletionStage prepareAsync(PrepareRequest request) { + return session.prepareAsync(request); + } + + @Override + public String getName() { + return session.getName(); + } + + @Override + public Metadata getMetadata() { + return session.getMetadata(); + } + + @Override + public boolean isSchemaMetadataEnabled() { + return session.isSchemaMetadataEnabled(); + } + + @Override + public CompletionStage setSchemaMetadataEnabled(@Nullable Boolean newValue) { + return session.setSchemaMetadataEnabled(newValue); + } + + @Override + public CompletionStage refreshSchemaAsync() { + return session.refreshSchemaAsync(); + } + + @Override + public Metadata refreshSchema() { + return session.refreshSchema(); + } + + @Override + public CompletionStage checkSchemaAgreementAsync() { + return session.checkSchemaAgreementAsync(); + } + + @Override + public boolean checkSchemaAgreement() { + return session.checkSchemaAgreement(); + } + + @Override + public DriverContext getContext() { + return session.getContext(); + } + + @Override + public Optional getKeyspace() { + return session.getKeyspace(); + } + + @Override + public Optional getMetrics() { + return session.getMetrics(); + } + + @Override + public CompletionStage closeFuture() { + return session.closeFuture(); + } + + @Override + public boolean isClosed() { + return session.isClosed(); + } + + @Override + public CompletionStage closeAsync() { + return session.closeAsync(); + } + + @Override + public CompletionStage forceCloseAsync() { + return session.forceCloseAsync(); + } + + @Override + public void close() { + session.close(); + } + + @Override + @Nullable + public RESULT execute( + REQUEST request, GenericType resultType) { + return session.execute(request, resultType); + } + + @Override + public ResultSet execute(String query) { + CassandraRequest request = CassandraRequest.create(session, query); + Context context = instrumenter.start(Context.current(), request); + ResultSet resultSet; + try (Scope ignored = context.makeCurrent()) { + resultSet = session.execute(query); + } catch (RuntimeException e) { + instrumenter.end(context, request, getExecutionInfo(e), e); + throw e; + } + instrumenter.end(context, request, resultSet.getExecutionInfo(), null); + return resultSet; + } + + @Override + public ResultSet execute(Statement statement) { + String query = getQuery(statement); + CassandraRequest request = CassandraRequest.create(session, query); + Context context = instrumenter.start(Context.current(), request); + ResultSet resultSet; + try (Scope ignored = context.makeCurrent()) { + resultSet = session.execute(statement); + } catch (RuntimeException e) { + instrumenter.end(context, request, getExecutionInfo(e), e); + throw e; + } + instrumenter.end(context, request, resultSet.getExecutionInfo(), null); + return resultSet; + } + + @Override + public CompletionStage executeAsync(Statement statement) { + String query = getQuery(statement); + CassandraRequest request = CassandraRequest.create(session, query); + return executeAsync(request, () -> session.executeAsync(statement)); + } + + @Override + public CompletionStage executeAsync(String query) { + CassandraRequest request = CassandraRequest.create(session, query); + return executeAsync(request, () -> session.executeAsync(query)); + } + + private CompletionStage executeAsync( + CassandraRequest request, Supplier> query) { + Context parentContext = Context.current(); + Context context = instrumenter.start(parentContext, request); + try (Scope ignored = context.makeCurrent()) { + CompletionStage stage = query.get(); + return wrap( + stage.whenComplete( + (asyncResultSet, throwable) -> + instrumenter.end( + context, request, getExecutionInfo(asyncResultSet, throwable), throwable)), + parentContext); + } + } + + @Override + public ReactiveResultSet executeReactive(Statement statement) { + return new DefaultReactiveResultSet(() -> executeAsync(statement)); + } + + static CompletableFuture wrap(CompletionStage future, Context context) { + CompletableFuture result = new CompletableFuture<>(); + future.whenComplete( + (T value, Throwable throwable) -> { + try (Scope ignored = context.makeCurrent()) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(value); + } + } + }); + + return result; + } + + private static String getQuery(Statement statement) { + String query = null; + if (statement instanceof SimpleStatement) { + query = ((SimpleStatement) statement).getQuery(); + } else if (statement instanceof BoundStatement) { + query = ((BoundStatement) statement).getPreparedStatement().getQuery(); + } + + return query == null ? "" : query; + } + + private static ExecutionInfo getExecutionInfo( + @Nullable AsyncResultSet asyncResultSet, @Nullable Throwable throwable) { + if (asyncResultSet != null) { + return asyncResultSet.getExecutionInfo(); + } else { + return getExecutionInfo(throwable); + } + } + + private static ExecutionInfo getExecutionInfo(@Nullable Throwable throwable) { + if (throwable instanceof DriverException) { + return ((DriverException) throwable).getExecutionInfo(); + } else if (throwable != null && throwable.getCause() instanceof DriverException) { + // TODO (trask) find out if this is needed and if so add comment explaining + return ((DriverException) throwable.getCause()).getExecutionInfo(); + } else { + return null; + } + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/test/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTest.java b/instrumentation/cassandra/cassandra-4.4/library/src/test/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTest.java new file mode 100644 index 000000000000..dcb2bb7848f0 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/library/src/test/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTest.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.cassandra.v4_4; + +import com.datastax.oss.driver.api.core.CqlSession; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.testing.cassandra.v4_4.AbstractCassandra44Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class CassandraTest extends AbstractCassandra44Test { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected CqlSession wrap(CqlSession session) { + return CassandraTelemetry.create(testing.getOpenTelemetry()).wrap(session); + } +} diff --git a/instrumentation/cassandra/cassandra-4.4/testing/build.gradle.kts b/instrumentation/cassandra/cassandra-4.4/testing/build.gradle.kts new file mode 100644 index 000000000000..faaad8aa7951 --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/testing/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + api(project(":testing-common")) + + implementation("com.datastax.oss:java-driver-core:4.4.0") + implementation("io.projectreactor:reactor-core:3.5.3") + + api(project(":instrumentation:cassandra:cassandra-4-common:testing")) +} diff --git a/instrumentation/cassandra/cassandra-4.4/testing/src/main/java/io/opentelemetry/testing/cassandra/v4_4/AbstractCassandra44Test.java b/instrumentation/cassandra/cassandra-4.4/testing/src/main/java/io/opentelemetry/testing/cassandra/v4_4/AbstractCassandra44Test.java new file mode 100644 index 000000000000..33f00b1790df --- /dev/null +++ b/instrumentation/cassandra/cassandra-4.4/testing/src/main/java/io/opentelemetry/testing/cassandra/v4_4/AbstractCassandra44Test.java @@ -0,0 +1,139 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.testing.cassandra.v4_4; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_PAGE_SIZE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_TABLE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_NAME; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_OPERATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_STATEMENT; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_SYSTEM; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_SOCK_PEER_ADDR; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_SOCK_PEER_NAME; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_SOCK_PEER_PORT; +import static org.junit.jupiter.api.Named.named; + +import com.datastax.oss.driver.api.core.CqlSession; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; + +public abstract class AbstractCassandra44Test extends AbstractCassandraTest { + + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("provideReactiveParameters") + void reactiveTest(Parameter parameter) { + CqlSession session = getSession(parameter.keyspace); + + testing() + .runWithSpan( + "parent", + () -> + Flux.from(session.executeReactive(parameter.statement)) + .doOnComplete(() -> testing().runWithSpan("child", () -> {})) + .blockLast()); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(parameter.spanName) + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"), + equalTo(NET_SOCK_PEER_NAME, "localhost"), + equalTo(NET_SOCK_PEER_PORT, cassandraPort), + equalTo(DB_SYSTEM, "cassandra"), + equalTo(DB_NAME, parameter.keyspace), + equalTo(DB_STATEMENT, parameter.expectedStatement), + equalTo(DB_OPERATION, parameter.operation), + equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"), + equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"), + satisfies( + DB_CASSANDRA_COORDINATOR_ID, + val -> val.isInstanceOf(String.class)), + satisfies( + DB_CASSANDRA_IDEMPOTENCE, + val -> val.isInstanceOf(Boolean.class)), + equalTo(DB_CASSANDRA_PAGE_SIZE, 5000), + equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0), + equalTo(DB_CASSANDRA_TABLE, parameter.table)), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + + session.close(); + } + + private static Stream provideReactiveParameters() { + return Stream.of( + Arguments.of( + named( + "Drop keyspace if exists", + new Parameter( + null, + "DROP KEYSPACE IF EXISTS reactive_test", + "DROP KEYSPACE IF EXISTS reactive_test", + "DB Query", + null, + null))), + Arguments.of( + named( + "Create keyspace with replication", + new Parameter( + null, + "CREATE KEYSPACE reactive_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}", + "CREATE KEYSPACE reactive_test WITH REPLICATION = {?:?, ?:?}", + "DB Query", + null, + null))), + Arguments.of( + named( + "Create table", + new Parameter( + "reactive_test", + "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )", + "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )", + "reactive_test", + null, + null))), + Arguments.of( + named( + "Insert data", + new Parameter( + "reactive_test", + "INSERT INTO reactive_test.users (id, name) values (uuid(), 'alice')", + "INSERT INTO reactive_test.users (id, name) values (uuid(), ?)", + "INSERT reactive_test.users", + "INSERT", + "reactive_test.users"))), + Arguments.of( + named( + "Select data", + new Parameter( + "reactive_test", + "SELECT * FROM users where name = 'alice' ALLOW FILTERING", + "SELECT * FROM users where name = ? ALLOW FILTERING", + "SELECT reactive_test.users", + "SELECT", + "users")))); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 42e2132773e1..421a7361e515 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -174,6 +174,10 @@ hideFromDependabot(":instrumentation:azure-core:azure-core-1.36:javaagent") hideFromDependabot(":instrumentation:azure-core:azure-core-1.36:library-instrumentation-shaded") hideFromDependabot(":instrumentation:cassandra:cassandra-3.0:javaagent") hideFromDependabot(":instrumentation:cassandra:cassandra-4.0:javaagent") +hideFromDependabot(":instrumentation:cassandra:cassandra-4.4:javaagent") +hideFromDependabot(":instrumentation:cassandra:cassandra-4.4:library") +hideFromDependabot(":instrumentation:cassandra:cassandra-4.4:testing") +hideFromDependabot(":instrumentation:cassandra:cassandra-4-common:testing") hideFromDependabot(":instrumentation:cdi-testing") hideFromDependabot(":instrumentation:graphql-java-12.0:javaagent") hideFromDependabot(":instrumentation:graphql-java-12.0:library")