Skip to content

Commit

Permalink
Add mongo commands to otel span attributes
Browse files Browse the repository at this point in the history
See #28473
  • Loading branch information
vkn authored and gsmet committed Apr 26, 2024
1 parent 4fe7808 commit 8e551cf
Show file tree
Hide file tree
Showing 14 changed files with 705 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public class MongoClientBuildTimeConfig {
@ConfigItem(name = "force-default-clients")
public boolean forceDefaultClients;

/**
* Whether or not tracing spans of driver commands are sent in case the quarkus-opentelemetry extension is present.
*/
@ConfigItem(name = "tracing.enabled")
public boolean tracingEnabled;

/**
* Configuration for DevServices. DevServices allows Quarkus to automatically start MongoDB in dev and test mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Default;
Expand Down Expand Up @@ -85,6 +86,8 @@ public class MongoClientProcessor {

private static final DotName MONGO_CLIENT_CUSTOMIZER = DotName.createSimple(MongoClientCustomizer.class.getName());

private static final String MONGODB_TRACING_COMMANDLISTENER_CLASSNAME = "io.quarkus.mongodb.tracing.MongoTracingCommandListener";

private static final String SERVICE_BINDING_INTERFACE_NAME = "io.quarkus.kubernetes.service.binding.runtime.ServiceBindingConverter";

@BuildStep
Expand Down Expand Up @@ -147,10 +150,14 @@ CommandListenerBuildItem collectCommandListeners(CombinedIndexBuildItem indexBui
MongoClientBuildTimeConfig buildTimeConfig, Capabilities capabilities) {
Collection<ClassInfo> commandListenerClasses = indexBuildItem.getIndex()
.getAllKnownImplementors(DotName.createSimple(CommandListener.class.getName()));
List<String> names = commandListenerClasses.stream()
.map(ci -> ci.name().toString())
.collect(Collectors.toList());
return new CommandListenerBuildItem(names);
Stream<String> names = commandListenerClasses.stream()
.map(ci -> ci.name().toString());
Stream<String> tracing = Stream.empty();
if (buildTimeConfig.tracingEnabled && capabilities.isPresent(Capability.OPENTELEMETRY_TRACER)) {
tracing = Stream.of(MONGODB_TRACING_COMMANDLISTENER_CLASSNAME);
}
var items = Stream.concat(names, tracing).toList();
return new CommandListenerBuildItem(items);
}

@BuildStep
Expand Down
5 changes: 5 additions & 0 deletions extensions/mongodb-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny-reactive-streams-operators</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.quarkus.mongodb.tracing;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.Nullable;

import jakarta.inject.Inject;

import org.jboss.logging.Logger;

import com.mongodb.event.*;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;

public class MongoTracingCommandListener implements CommandListener {
private static final org.jboss.logging.Logger LOGGER = Logger.getLogger(MongoTracingCommandListener.class);
private static final String KEY = "mongodb.command";
private final Map<Integer, ContextEvent> requestMap;
private final Instrumenter<CommandStartedEvent, Void> instrumenter;

private record ContextEvent(Context context, CommandStartedEvent commandEvent) {
}

@Inject
public MongoTracingCommandListener(OpenTelemetry openTelemetry) {
requestMap = new ConcurrentHashMap<>();
SpanNameExtractor<CommandStartedEvent> spanNameExtractor = CommandEvent::getCommandName;
instrumenter = Instrumenter.<CommandStartedEvent, Void> builder(
openTelemetry, "quarkus-mongodb-client", spanNameExtractor)
.addAttributesExtractor(new CommandEventAttrExtractor())
.buildInstrumenter(SpanKindExtractor.alwaysClient());
LOGGER.debugf("MongoTracingCommandListener created");
}

@Override
public void commandStarted(CommandStartedEvent event) {
LOGGER.tracef("commandStarted event %s", event.getCommandName());

Context parentContext = Context.current();
if (instrumenter.shouldStart(parentContext, event)) {
Context context = instrumenter.start(parentContext, event);
requestMap.put(event.getRequestId(), new ContextEvent(context, event));
}
}

@Override
public void commandSucceeded(CommandSucceededEvent event) {
LOGGER.tracef("commandSucceeded event %s", event.getCommandName());
ContextEvent contextEvent = requestMap.remove(event.getRequestId());
if (contextEvent != null) {
instrumenter.end(contextEvent.context(), contextEvent.commandEvent(), null, null);
}
}

@Override
public void commandFailed(CommandFailedEvent event) {
LOGGER.tracef("commandFailed event %s", event.getCommandName());
ContextEvent contextEvent = requestMap.remove(event.getRequestId());
if (contextEvent != null) {
instrumenter.end(
contextEvent.context(),
contextEvent.commandEvent(),
null,
event.getThrowable());
}
}

private static class CommandEventAttrExtractor implements AttributesExtractor<CommandStartedEvent, Void> {
@Override
public void onStart(AttributesBuilder attributesBuilder,
Context context,
CommandStartedEvent commandStartedEvent) {
attributesBuilder.put(KEY, commandStartedEvent.getCommand().toJson());
}

@Override
public void onEnd(AttributesBuilder attributesBuilder,
Context context,
CommandStartedEvent commandStartedEvent,
@Nullable Void unused,
@Nullable Throwable throwable) {

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.quarkus.mongodb.tracing;

import static org.assertj.core.api.Assertions.assertThatNoException;

import org.bson.BsonDocument;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerId;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;

import io.opentelemetry.api.OpenTelemetry;

class MongoTracingCommandListenerTest {
private ConnectionDescription connDescr;
private MongoTracingCommandListener listener;
private BsonDocument command;

@BeforeEach
void setUp() {
connDescr = new ConnectionDescription(new ServerId(new ClusterId(), new ServerAddress()));
listener = new MongoTracingCommandListener(OpenTelemetry.noop());
command = new BsonDocument();
}

@Test
void commandStarted() {
var startEvent = new CommandStartedEvent(
null,
1L,
10,
connDescr,
"db",
"find",
command);
assertThatNoException().isThrownBy(() -> listener.commandStarted(startEvent));

CommandSucceededEvent successEvent = new CommandSucceededEvent(null,
startEvent.getOperationId(),
startEvent.getRequestId(),
connDescr,
startEvent.getDatabaseName(),
startEvent.getCommandName(),
startEvent.getCommand(),
10L);
assertThatNoException().isThrownBy(() -> listener.commandSucceeded(successEvent));
}

@Test
void commandSucceeded() {
CommandSucceededEvent cmd = new CommandSucceededEvent(null,
1L,
10,
connDescr,
"db",
"find",
command,
10L);
assertThatNoException().isThrownBy(() -> listener.commandSucceeded(cmd));
}

@Test
void commandFailed() {
var startedEvent = new CommandStartedEvent(
null,
1L,
10,
connDescr,
"db",
"find",
command);
assertThatNoException().isThrownBy(() -> listener.commandStarted(startedEvent));

CommandFailedEvent failedEvent = new CommandFailedEvent(null,
1L,
10,
connDescr,
"db",
"find",
10L,
new IllegalStateException("command failed"));
assertThatNoException().isThrownBy(() -> listener.commandFailed(failedEvent));
}

@Test
void commandFailedNoEvent() {
CommandFailedEvent cmd = new CommandFailedEvent(null,
1L,
10,
connDescr,
"db",
"find",
10L,
new IllegalStateException("command failed"));
assertThatNoException().isThrownBy(() -> listener.commandFailed(cmd));
}

}
Loading

0 comments on commit 8e551cf

Please sign in to comment.