Skip to content

Commit

Permalink
Allow configuration of CassandraObservationConvention.
Browse files Browse the repository at this point in the history
Closes #1490
  • Loading branch information
mp911de committed Aug 21, 2024
1 parent 8c7c308 commit 7d0a718
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @author Greg Turnquist
* @since 4.0
*/
enum CassandraObservation implements ObservationDocumentation {
public enum CassandraObservation implements ObservationDocumentation {

/**
* Create an {@link io.micrometer.observation.Observation} for Cassandra-based queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ final class CqlSessionObservationInterceptor implements MethodInterceptor {

private final ObservationRegistry observationRegistry;

private final CassandraObservationConvention observationConvention = new DefaultCassandraObservationConvention();
private final CassandraObservationConvention convention;

CqlSessionObservationInterceptor(CqlSession delegate, String remoteServiceName,
ObservationRegistry observationRegistry) {
CassandraObservationConvention convention, ObservationRegistry observationRegistry) {

this.delegate = delegate;
this.remoteServiceName = remoteServiceName;
this.convention = convention;
this.observationRegistry = observationRegistry;
}

Expand Down Expand Up @@ -140,7 +141,7 @@ private static Statement<?> createStatement(Object[] args) {
return SimpleStatement.newInstance((String) args[0]);
}

if (args[0]instanceof String query && args.length == 2) {
if (args[0] instanceof String query && args.length == 2) {
return args[1] instanceof Map //
? SimpleStatement.newInstance(query, (Map) args[1]) //
: SimpleStatement.newInstance(query, (Object[]) args[1]);
Expand Down Expand Up @@ -179,7 +180,7 @@ private Observation startObservation(Statement<?> statement, boolean prepare, St
delegate.getContext().getSessionName(),
delegate.getKeyspace().map(CqlIdentifier::asInternal).orElse("system")),
observationRegistry)
.observationConvention(observationConvention);
.observationConvention(convention);

if (currentObservation != null) {
observation.parentObservation(currentObservation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
* @author Mark Paluch
* @since 4.0
*/
class DefaultCassandraObservationConvention implements CassandraObservationConvention {
public class DefaultCassandraObservationConvention implements CassandraObservationConvention {

public static final CassandraObservationConvention INSTANCE = new DefaultCassandraObservationConvention();

@Override
public KeyValues getLowCardinalityKeyValues(CassandraObservationContext context) {
Expand Down Expand Up @@ -104,10 +106,10 @@ public KeyValues getHighCardinalityKeyValues(CassandraObservationContext context
}

@Nullable
private InetSocketAddress tryGetSocketAddress(EndPoint endPoint) {
protected InetSocketAddress tryGetSocketAddress(EndPoint endPoint) {

try {
if (endPoint.resolve()instanceof InetSocketAddress inet) {
if (endPoint.resolve() instanceof InetSocketAddress inet) {
return inet;
}

Expand All @@ -121,13 +123,28 @@ public String getContextualName(CassandraObservationContext context) {
return (context.isPrepare() ? "PREPARE: " : "") + getOperationName(getCql(context.getStatement()), "");
}

/**
* Tries to parse the CQL query or provides the default name.
*
* @param defaultName if there's no query
* @return span name
*/
public String getOperationName(String cql, String defaultName) {

if (StringUtils.hasText(cql) && cql.indexOf(' ') > -1) {
return cql.substring(0, cql.indexOf(' '));
}

return defaultName;
}

/**
* Extract the CQL query from the delegate {@link Statement}.
*
* @return string-based CQL of the delegate
* @param statement
*/
private static String getCql(Statement<?> statement) {
protected static String getCql(Statement<?> statement) {

String query = "";

Expand Down Expand Up @@ -155,7 +172,7 @@ private static String getCql(Statement<?> statement) {
* @param statement
* @return query
*/
private static String getQuery(Statement<?> statement) {
protected static String getQuery(Statement<?> statement) {

if (statement instanceof SimpleStatement) {
return ((SimpleStatement) statement).getQuery();
Expand All @@ -167,19 +184,4 @@ private static String getQuery(Statement<?> statement) {

return "";
}

/**
* Tries to parse the CQL query or provides the default name.
*
* @param defaultName if there's no query
* @return span name
*/
public String getOperationName(String cql, String defaultName) {

if (StringUtils.hasText(cql) && cql.indexOf(' ') > -1) {
return cql.substring(0, cql.indexOf(' '));
}

return defaultName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import io.micrometer.observation.ObservationRegistry;

import org.springframework.aop.RawTargetAccess;
import org.springframework.aop.TargetSource;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.data.cassandra.observability.CqlSessionObservationInterceptor.ObservationDecoratedProxy;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -58,20 +56,36 @@ public static CqlSession wrap(CqlSession session, ObservationRegistry observatio
* @return
*/
public static CqlSession wrap(CqlSession session, String remoteServiceName, ObservationRegistry observationRegistry) {
return wrap(session, remoteServiceName, DefaultCassandraObservationConvention.INSTANCE, observationRegistry);
}

/**
* Wrap the {@link CqlSession} with a {@link CqlSessionObservationInterceptor}.
*
* @param session must not be {@literal null}.
* @param remoteServiceName must not be {@literal null}.
* @param convention the observation convention.
* @param observationRegistry must not be {@literal null}.
* @return
* @since 4.3.4
*/
public static CqlSession wrap(CqlSession session, String remoteServiceName, CassandraObservationConvention convention,
ObservationRegistry observationRegistry) {

Assert.notNull(session, "CqlSession must not be null");
Assert.notNull(remoteServiceName, "CqlSessionObservationConvention must not be null");
Assert.notNull(remoteServiceName, "Remote service name must not be null");
Assert.notNull(convention, "CassandraObservationConvention must not be null");
Assert.notNull(observationRegistry, "ObservationRegistry must not be null");

ProxyFactory proxyFactory = new ProxyFactory();

proxyFactory.setTarget(session);
proxyFactory.addAdvice(new CqlSessionObservationInterceptor(session, remoteServiceName, observationRegistry));
proxyFactory
.addAdvice(new CqlSessionObservationInterceptor(session, remoteServiceName, convention, observationRegistry));
proxyFactory.addInterface(CqlSession.class);
proxyFactory.addInterface(ObservationDecoratedProxy.class);

return (CqlSession) proxyFactory.getProxy();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class ObservableCqlSessionFactoryBean extends AbstractFactoryBean<CqlSess

private @Nullable String remoteServiceName;

private CassandraObservationConvention convention = DefaultCassandraObservationConvention.INSTANCE;

/**
* Construct a new {@link ObservableCqlSessionFactoryBean}.
*
Expand All @@ -58,16 +60,39 @@ public ObservableCqlSessionFactoryBean(CqlSessionBuilder cqlSessionBuilder, Obse
this.observationRegistry = observationRegistry;
}

@Nullable
public String getRemoteServiceName() {
return remoteServiceName;
}

/**
* Set the remote service name.
*
* @param remoteServiceName
*/
public void setRemoteServiceName(@Nullable String remoteServiceName) {
this.remoteServiceName = remoteServiceName;
}

/**
* Set the observation convention.
*
* @param convention
* @since 4.3.4
*/
public void setConvention(CassandraObservationConvention convention) {
this.convention = convention;
}

@Override
protected CqlSession createInstance() {

cqlSessionBuilder.addRequestTracker(ObservationRequestTracker.INSTANCE);

if (ObjectUtils.isEmpty(getRemoteServiceName())) {
return ObservableCqlSessionFactory.wrap(cqlSessionBuilder.build(), observationRegistry);
}
String remoteServiceName = ObjectUtils.isEmpty(getRemoteServiceName()) ? "Cassandra" : getRemoteServiceName();

return ObservableCqlSessionFactory.wrap(cqlSessionBuilder.build(), getRemoteServiceName(), observationRegistry);
return ObservableCqlSessionFactory.wrap(cqlSessionBuilder.build(), remoteServiceName, convention,
observationRegistry);
}

@Override
Expand All @@ -82,17 +107,4 @@ public Class<?> getObjectType() {
return CqlSession.class;
}

@Nullable
public String getRemoteServiceName() {
return remoteServiceName;
}

/**
* Set the remote service name.
*
* @param remoteServiceName
*/
public void setRemoteServiceName(@Nullable String remoteServiceName) {
this.remoteServiceName = remoteServiceName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ public class ObservableReactiveSession implements ReactiveSession {

private final ObservationRegistry observationRegistry;

private final CassandraObservationConvention convention = new DefaultCassandraObservationConvention();
private final CassandraObservationConvention convention;

ObservableReactiveSession(ReactiveSession delegate, String remoteServiceName,
ObservationRegistry observationRegistry) {
CassandraObservationConvention convention, ObservationRegistry observationRegistry) {
this.delegate = delegate;
this.remoteServiceName = remoteServiceName;
this.convention = convention;
this.observationRegistry = observationRegistry;
}

Expand All @@ -67,19 +68,37 @@ public class ObservableReactiveSession implements ReactiveSession {
* @return traced representation of a {@link ReactiveSession}.
*/
public static ReactiveSession create(ReactiveSession session, ObservationRegistry observationRegistry) {
return new ObservableReactiveSession(session, "Cassandra", observationRegistry);
return new ObservableReactiveSession(session, "Cassandra", DefaultCassandraObservationConvention.INSTANCE,
observationRegistry);
}

/**
* Factory method for creation of a {@link ObservableReactiveSession}.
*
* @param session reactive session.
* @param remoteServiceName the remote service name.
* @param observationRegistry observation registry.
* @return traced representation of a {@link ReactiveSession}.
*/
public static ReactiveSession create(ReactiveSession session, String remoteServiceName,
ObservationRegistry observationRegistry) {
return new ObservableReactiveSession(session, remoteServiceName, observationRegistry);
return new ObservableReactiveSession(session, remoteServiceName, DefaultCassandraObservationConvention.INSTANCE,
observationRegistry);
}

/**
* Factory method for creation of a {@link ObservableReactiveSession}.
*
* @param session reactive session.
* @param remoteServiceName the remote service name.
* @param convention the observation convention.
* @param observationRegistry observation registry.
* @return traced representation of a {@link ReactiveSession}.
* @since 4.3.4
*/
public static ReactiveSession create(ReactiveSession session, String remoteServiceName,
CassandraObservationConvention convention, ObservationRegistry observationRegistry) {
return new ObservableReactiveSession(session, remoteServiceName, convention, observationRegistry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,27 @@ public static ReactiveSession wrap(ReactiveSession session, ObservationRegistry
*/
public static ReactiveSession wrap(ReactiveSession session, String remoteServiceName,
ObservationRegistry observationRegistry) {
return wrap(session, remoteServiceName, DefaultCassandraObservationConvention.INSTANCE, observationRegistry);
}

/**
* Wrap the {@link CqlSession} with a {@link CqlSessionObservationInterceptor}.
*
* @param session must not be {@literal null}.
* @param remoteServiceName must not be {@literal null}.
* @param convention the observation convention.
* @param observationRegistry must not be {@literal null}.
* @return
* @since 4.3.4
*/
public static ReactiveSession wrap(ReactiveSession session, String remoteServiceName,
CassandraObservationConvention convention, ObservationRegistry observationRegistry) {

Assert.notNull(session, "CqlSession must not be null");
Assert.notNull(remoteServiceName, "CqlSessionObservationConvention must not be null");
Assert.notNull(remoteServiceName, "Remote service name must not be null");
Assert.notNull(convention, "CassandraObservationConvention must not be null");
Assert.notNull(observationRegistry, "ObservationRegistry must not be null");

return ObservableReactiveSession.create(session, remoteServiceName, observationRegistry);
return ObservableReactiveSession.create(session, remoteServiceName, convention, observationRegistry);
}
}
Loading

0 comments on commit 7d0a718

Please sign in to comment.