Skip to content

Commit

Permalink
Cassandra: allow extracting keyspace from statement result
Browse files Browse the repository at this point in the history
  • Loading branch information
amarziali committed Jan 17, 2025
1 parent f1d359b commit e6257ec
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ public String getDbType() {
public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection) {
if (connection != null) {
span.setTag(Tags.DB_USER, dbUser(connection));
final String instanceName = dbInstance(connection);
span.setTag(Tags.DB_INSTANCE, instanceName);

String serviceName = dbClientService(instanceName);
if (null != serviceName) {
span.setServiceName(serviceName);
}

onInstance(span, dbInstance(connection));
CharSequence hostName = dbHostname(connection);
if (hostName != null) {
span.setTag(Tags.PEER_HOSTNAME, hostName);
Expand All @@ -90,6 +83,17 @@ public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection)
return span;
}

protected AgentSpan onInstance(final AgentSpan span, final String dbInstance) {
if (dbInstance != null) {
span.setTag(Tags.DB_INSTANCE, dbInstance);
String serviceName = dbClientService(dbInstance);
if (null != serviceName) {
span.setServiceName(serviceName);
}
}
return span;
}

public String dbService(final String dbType, final String instanceName) {
if (instanceName != null && Config.get().isDbClientSplitByInstance()) {
return dbClientService(instanceName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED
import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE

import com.datastax.driver.core.Cluster
Expand Down Expand Up @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
def "test sync"() {
setup:

Session session = cluster.connect(keyspace)
Session session = keyspace ? cluster.connect(keyspace) : cluster.connect()
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
session.execute(statement)
Expand All @@ -71,27 +75,37 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
}
}
trace(1) {
cassandraSpan(it, statement, keyspace, renameService)
cassandraSpan(it, statement, expectedKeySpace, renameService)
}
}

cleanup:
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS sync_test" | null | false
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
}

def "test async"() {
setup:

def callbackExecuted = new CountDownLatch(1)
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
Session session = cluster.connect(keyspace)
Expand All @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
trace(3) {
sortSpansByStart()
basicSpan(it, "parent")
cassandraSpan(it, statement, keyspace, renameService, span(0))
cassandraSpan(it, statement, expectedKeySpace, renameService, span(0))
basicSpan(it, "callbackListener", span(0))
}
}
Expand All @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS async_test" | null | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS async_test" | null | null | false | false
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true
}

String normalize(String statement){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package datadog.trace.instrumentation.datastax.cassandra;

import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_INSTANCE;

import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import datadog.trace.api.Config;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.naming.SpanNaming;
Expand All @@ -11,6 +15,7 @@
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator;
import datadog.trace.util.Strings;
import java.util.function.ToIntFunction;

public class CassandraClientDecorator extends DBTypeProcessingDatabaseClientDecorator<Session> {
Expand Down Expand Up @@ -82,6 +87,15 @@ public AgentSpan onResponse(final AgentSpan span, final ResultSet result) {
if (result != null) {
final Host host = result.getExecutionInfo().getQueriedHost();
onPeerConnection(span, host.getSocketAddress());
if (Config.get().isCassandraKeyspaceStatementExtractionEnabled()) {
final ColumnDefinitions defs = result.getColumnDefinitions();
if (defs != null && defs.size() > 0) {
final String keySpace = defs.getKeyspace(0);
if (Strings.isNotBlank(keySpace) && !keySpace.equals(span.getTag(DB_INSTANCE))) {
onInstance(span, keySpace);
}
}
}
}
return span;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED
import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE

import com.datastax.driver.core.Cluster
Expand Down Expand Up @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
def "test sync"() {
setup:

Session session = cluster.connect(keyspace)
Session session = keyspace ? cluster.connect(keyspace) : cluster.connect()
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
session.execute(statement)
Expand All @@ -71,27 +75,37 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
}
}
trace(1) {
cassandraSpan(it, statement, keyspace, renameService)
cassandraSpan(it, statement, expectedKeySpace, renameService)
}
}

cleanup:
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS sync_test" | null | false
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
}

def "test async"() {
setup:

def callbackExecuted = new CountDownLatch(1)
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
Session session = cluster.connect(keyspace)
Expand All @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
trace(3) {
sortSpansByStart()
basicSpan(it, "parent")
cassandraSpan(it, statement, keyspace, renameService, span(0))
cassandraSpan(it, statement, expectedKeySpace, renameService, span(0))
basicSpan(it, "callbackListener", span(0))
}
}
Expand All @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS async_test" | null | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS async_test" | null | null | false | false
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true
}

String normalize(String statement){
Expand Down
Loading

0 comments on commit e6257ec

Please sign in to comment.