Skip to content

Commit

Permalink
Enhance mongodb otel integration
Browse files Browse the repository at this point in the history
Continue PR quarkusio#40191

- Add docs
- Add tests
- Fix parent-child spans for reactive request
  • Loading branch information
vkn committed May 18, 2024
1 parent 6cf3b7e commit 45a41f0
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 39 deletions.
12 changes: 12 additions & 0 deletions docs/src/main/asciidoc/mongodb.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,18 @@ This behavior must first be enabled by setting the `quarkus.mongodb.metrics.enab
So when you access the `/q/metrics` endpoint of your application you will have information about the connection pool status.
When using xref:smallrye-metrics.adoc[SmallRye Metrics], connection pool metrics will be available under the `vendor` scope.


== Tracing

To use tracing with mongo, you need to add the xref:opentelemetry.adoc[`quarkus-opentelemetry`] extension to your project.

Even with all the tracing infrastructure in place the mongodb tracing is not enabled by default, and you need to enable it by setting this property:
[source, properties]
----
# enable tracing
quarkus.mongodb.tracing.enabled=true
----

== Testing helpers

xref:#dev-services[Dev Services for MongoDB] is your best option to start a MongoDB database for your unit tests.
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/opentelemetry.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ Additional exporters will be available in the Quarkiverse https://docs.quarkiver
* https://quarkus.io/guides/resteasy-client[`quarkus-resteasy-client`]
* https://quarkus.io/guides/scheduler[`quarkus-scheduler`]
* https://quarkus.io/guides/smallrye-graphql[`quarkus-smallrye-graphql`]
* https://quarkus.io/extensions/io.quarkus/quarkus-mongodb-client[`quarkus-mongodb-client`]
* https://quarkus.io/extensions/io.quarkus/quarkus-messaging[`quarkus-messaging`]
** AMQP 1.0
** RabbitMQ
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.mongodb.deployment;

import java.util.List;

import com.mongodb.reactivestreams.client.ReactiveContextProvider;

import io.quarkus.builder.item.SimpleBuildItem;

/**
* Register additional {@link ReactiveContextProvider}s for the MongoDB clients.
*/
public final class ContextProviderBuildItem extends SimpleBuildItem {

private final List<String> classNames;

public ContextProviderBuildItem(List<String> classNames) {
this.classNames = classNames;
}

public List<String> getContextProviderClassNames() {
return classNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.mongodb.client.model.changestream.UpdateDescription;
import com.mongodb.event.CommandListener;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.reactivestreams.client.ReactiveContextProvider;
import com.mongodb.spi.dns.DnsClientProvider;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
Expand Down Expand Up @@ -69,6 +70,7 @@
import io.quarkus.mongodb.runtime.MongoClientRecorder;
import io.quarkus.mongodb.runtime.MongoClientSupport;
import io.quarkus.mongodb.runtime.MongoClients;
import io.quarkus.mongodb.runtime.MongoReactiveContextProvider;
import io.quarkus.mongodb.runtime.MongoServiceBindingConverter;
import io.quarkus.mongodb.runtime.MongodbConfig;
import io.quarkus.mongodb.runtime.dns.MongoDnsClient;
Expand Down Expand Up @@ -113,9 +115,11 @@ AdditionalIndexedClassesBuildItem includeDnsTypesToIndex() {
}

@BuildStep
AdditionalIndexedClassesBuildItem includeDnsTypesToIndex(MongoClientBuildTimeConfig buildTimeConfig) {
AdditionalIndexedClassesBuildItem includeMongoCommandListener(MongoClientBuildTimeConfig buildTimeConfig) {
if (buildTimeConfig.tracingEnabled) {
return new AdditionalIndexedClassesBuildItem(MongoTracingCommandListener.class.getName());
return new AdditionalIndexedClassesBuildItem(
MongoTracingCommandListener.class.getName(),
MongoReactiveContextProvider.class.getName());
}
return new AdditionalIndexedClassesBuildItem();
}
Expand Down Expand Up @@ -161,15 +165,27 @@ CommandListenerBuildItem collectCommandListeners(CombinedIndexBuildItem indexBui
return new CommandListenerBuildItem(names);
}

@BuildStep
ContextProviderBuildItem collectContextProviders(CombinedIndexBuildItem indexBuildItem) {
Collection<ClassInfo> contextProviders = indexBuildItem.getIndex()
.getAllKnownImplementors(DotName.createSimple(ReactiveContextProvider.class.getName()));
List<String> names = contextProviders.stream()
.map(ci -> ci.name().toString())
.collect(Collectors.toList());
return new ContextProviderBuildItem(names);
}

@BuildStep
List<ReflectiveClassBuildItem> addExtensionPointsToNative(CodecProviderBuildItem codecProviders,
PropertyCodecProviderBuildItem propertyCodecProviders, BsonDiscriminatorBuildItem bsonDiscriminators,
CommandListenerBuildItem commandListeners) {
CommandListenerBuildItem commandListeners,
ContextProviderBuildItem contextProviders) {
List<String> reflectiveClassNames = new ArrayList<>();
reflectiveClassNames.addAll(codecProviders.getCodecProviderClassNames());
reflectiveClassNames.addAll(propertyCodecProviders.getPropertyCodecProviderClassNames());
reflectiveClassNames.addAll(bsonDiscriminators.getBsonDiscriminatorClassNames());
reflectiveClassNames.addAll(commandListeners.getCommandListenerClassNames());
reflectiveClassNames.addAll(contextProviders.getContextProviderClassNames());

List<ReflectiveClassBuildItem> reflectiveClass = reflectiveClassNames.stream()
.map(s -> ReflectiveClassBuildItem.builder(s).methods().build())
Expand Down Expand Up @@ -256,6 +272,7 @@ void build(
PropertyCodecProviderBuildItem propertyCodecProvider,
BsonDiscriminatorBuildItem bsonDiscriminator,
CommandListenerBuildItem commandListener,
ContextProviderBuildItem contextProvider,
List<MongoConnectionPoolListenerBuildItem> connectionPoolListenerProvider,
BuildProducer<AdditionalBeanBuildItem> additionalBeanBuildItemProducer,
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer) {
Expand All @@ -277,6 +294,9 @@ void build(
for (String name : commandListener.getCommandListenerClassNames()) {
additionalBeansBuilder.addBeanClass(name);
}
for (String name : contextProvider.getContextProviderClassNames()) {
additionalBeansBuilder.addBeanClass(name);
}
additionalBeanBuildItemProducer.produce(additionalBeansBuilder.build());

// create MongoClientSupport as a synthetic bean as it's used in AbstractMongoClientProducer
Expand Down
5 changes: 5 additions & 0 deletions extensions/mongodb-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<artifactId>quarkus-kubernetes-service-binding</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
<optional>true</optional>
</dependency>

<!-- Add the health extension as optional as we will produce the health check only if it's included -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.mongodb.connection.SslSettings;
import com.mongodb.event.CommandListener;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.reactivestreams.client.ReactiveContextProvider;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;
Expand Down Expand Up @@ -86,18 +87,21 @@ public class MongoClients {

private final Map<String, MongoClient> mongoclients = new HashMap<>();
private final Map<String, ReactiveMongoClient> reactiveMongoClients = new HashMap<>();
private final Instance<ReactiveContextProvider> reactiveContextProviders;
private final Instance<MongoClientCustomizer> customizers;

public MongoClients(MongodbConfig mongodbConfig, MongoClientSupport mongoClientSupport,
Instance<CodecProvider> codecProviders,
Instance<PropertyCodecProvider> propertyCodecProviders,
Instance<CommandListener> commandListeners,
Instance<ReactiveContextProvider> reactiveContextProviders,
@Any Instance<MongoClientCustomizer> customizers) {
this.mongodbConfig = mongodbConfig;
this.mongoClientSupport = mongoClientSupport;
this.codecProviders = codecProviders;
this.propertyCodecProviders = propertyCodecProviders;
this.commandListeners = commandListeners;
this.reactiveContextProviders = reactiveContextProviders;
this.customizers = customizers;

try {
Expand All @@ -121,15 +125,17 @@ public MongoClients(MongodbConfig mongodbConfig, MongoClientSupport mongoClientS
}

public MongoClient createMongoClient(String clientName) throws MongoException {
MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName));
MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName),
false);
MongoClient client = com.mongodb.client.MongoClients.create(mongoConfiguration);
mongoclients.put(clientName, client);
return client;
}

public ReactiveMongoClient createReactiveMongoClient(String clientName)
throws MongoException {
MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName));
MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName),
true);
com.mongodb.reactivestreams.client.MongoClient client = com.mongodb.reactivestreams.client.MongoClients
.create(mongoConfiguration);
ReactiveMongoClientImpl reactive = new ReactiveMongoClientImpl(client);
Expand Down Expand Up @@ -254,14 +260,18 @@ public void apply(ServerSettings.Builder builder) {
}
}

private MongoClientSettings createMongoConfiguration(String name, MongoClientConfig config) {
private MongoClientSettings createMongoConfiguration(String name, MongoClientConfig config, boolean isReactive) {
if (config == null) {
throw new RuntimeException("mongo config is missing for creating mongo client.");
}
CodecRegistry defaultCodecRegistry = MongoClientSettings.getDefaultCodecRegistry();

MongoClientSettings.Builder settings = MongoClientSettings.builder();

if (isReactive) {
reactiveContextProviders.stream().findAny().ifPresent(settings::contextProvider);
}

ConnectionString connectionString;
Optional<String> maybeConnectionString = config.connectionString;
if (maybeConnectionString.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkus.mongodb.runtime;

import org.reactivestreams.Subscriber;

import com.mongodb.RequestContext;
import com.mongodb.reactivestreams.client.ReactiveContextProvider;

import io.opentelemetry.context.Context;

public class MongoReactiveContextProvider implements ReactiveContextProvider {

@Override
public RequestContext getContext(Subscriber<?> subscriber) {
return new MongoRequestContext(Context.current());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.mongodb.runtime;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import com.mongodb.RequestContext;

import io.opentelemetry.context.Context;

@SuppressWarnings("unchecked")
public class MongoRequestContext implements RequestContext {
public static final String OTEL_CONTEXT_KEY = "otel.context.current";
private final Map<Object, Object> valuesMap;

MongoRequestContext(Context currentContext) {
valuesMap = new ConcurrentHashMap<>();
valuesMap.put(OTEL_CONTEXT_KEY, currentContext);
}

@Override
public <T> T get(Object key) {
return (T) valuesMap.get(key);
}

@Override
public boolean hasKey(Object key) {
return valuesMap.containsKey(key);
}

@Override
public boolean isEmpty() {
return valuesMap.isEmpty();
}

@Override
public void put(Object key, Object value) {
valuesMap.put(key, value);
}

@Override
public void delete(Object key) {
valuesMap.remove(key);
}

@Override
public int size() {
return valuesMap.size();
}

@Override
public Stream<Map.Entry<Object, Object>> stream() {
return valuesMap.entrySet().stream();
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package io.quarkus.mongodb.tracing;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.Nullable;

import jakarta.inject.Inject;

import org.bson.BsonDocument;
import org.jboss.logging.Logger;

import com.mongodb.event.*;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
Expand All @@ -18,21 +23,25 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.quarkus.mongodb.runtime.MongoRequestContext;

public class MongoTracingCommandListener implements CommandListener {
private static final org.jboss.logging.Logger LOGGER = Logger.getLogger(MongoTracingCommandListener.class);
private static final String KEY = "mongodb.command";
private final Map<Integer, ContextEvent> requestMap;
private final Instrumenter<CommandStartedEvent, Void> instrumenter;
private final Instrumenter<MongoCommand, Void> instrumenter;

private record ContextEvent(Context context, CommandStartedEvent commandEvent) {
private record MongoCommand(String name, BsonDocument command) {
}

private record ContextEvent(Context context, MongoCommand command) {
}

@Inject
public MongoTracingCommandListener(OpenTelemetry openTelemetry) {
requestMap = new ConcurrentHashMap<>();
SpanNameExtractor<CommandStartedEvent> spanNameExtractor = CommandEvent::getCommandName;
instrumenter = Instrumenter.<CommandStartedEvent, Void> builder(
SpanNameExtractor<MongoCommand> spanNameExtractor = MongoCommand::name;
instrumenter = Instrumenter.<MongoCommand, Void> builder(
openTelemetry, "quarkus-mongodb-client", spanNameExtractor)
.addAttributesExtractor(new CommandEventAttrExtractor())
.buildInstrumenter(SpanKindExtractor.alwaysClient());
Expand All @@ -43,10 +52,13 @@ public MongoTracingCommandListener(OpenTelemetry openTelemetry) {
public void commandStarted(CommandStartedEvent event) {
LOGGER.tracef("commandStarted event %s", event.getCommandName());

Context parentContext = Context.current();
if (instrumenter.shouldStart(parentContext, event)) {
Context context = instrumenter.start(parentContext, event);
requestMap.put(event.getRequestId(), new ContextEvent(context, event));
Context parentContext = Optional.ofNullable(event.getRequestContext())
.map(rc -> (Context) rc.get(MongoRequestContext.OTEL_CONTEXT_KEY))
.orElseGet(Context::current);
var mongoCommand = new MongoCommand(event.getCommandName(), event.getCommand());
if (instrumenter.shouldStart(parentContext, mongoCommand)) {
Context context = instrumenter.start(parentContext, mongoCommand);
requestMap.put(event.getRequestId(), new ContextEvent(context, mongoCommand));
}
}

Expand All @@ -55,7 +67,7 @@ public void commandSucceeded(CommandSucceededEvent event) {
LOGGER.tracef("commandSucceeded event %s", event.getCommandName());
ContextEvent contextEvent = requestMap.remove(event.getRequestId());
if (contextEvent != null) {
instrumenter.end(contextEvent.context(), contextEvent.commandEvent(), null, null);
instrumenter.end(contextEvent.context(), contextEvent.command(), null, null);
}
}

Expand All @@ -66,27 +78,26 @@ public void commandFailed(CommandFailedEvent event) {
if (contextEvent != null) {
instrumenter.end(
contextEvent.context(),
contextEvent.commandEvent(),
contextEvent.command(),
null,
event.getThrowable());
}
}

private static class CommandEventAttrExtractor implements AttributesExtractor<CommandStartedEvent, Void> {
private static class CommandEventAttrExtractor implements AttributesExtractor<MongoCommand, Void> {

@Override
public void onStart(AttributesBuilder attributesBuilder,
Context context,
CommandStartedEvent commandStartedEvent) {
attributesBuilder.put(KEY, commandStartedEvent.getCommand().toJson());
public void onStart(AttributesBuilder attributesBuilder, Context context, MongoCommand command) {
attributesBuilder.put(KEY, command.command().toJson());
}

@Override
public void onEnd(AttributesBuilder attributesBuilder,
public void onEnd(
AttributesBuilder attributesBuilder,
Context context,
CommandStartedEvent commandStartedEvent,
MongoCommand command,
@Nullable Void unused,
@Nullable Throwable throwable) {

}
}
}
Loading

0 comments on commit 45a41f0

Please sign in to comment.