Skip to content

Commit

Permalink
Add Micrometer Observation support to Spring Dapr Messaging (#1150)
Browse files Browse the repository at this point in the history
* Add Micrometer Observation support to Spring Dapr Messaging

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Remove direct Micrometer deps it is part of Spring Boot

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Remove another explicit dependency

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Hide default observation convention implementation

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Fix typo in default message builder

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

* Ensure trace is properly sent using OTEL

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>

---------

Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>
Co-authored-by: Artur Ciocanu <ciocanu@adobe.com>
  • Loading branch information
artur-ciocanu and Artur Ciocanu authored Nov 5, 2024
1 parent 0b7a051 commit be05a47
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 16 deletions.
4 changes: 2 additions & 2 deletions dapr-spring/dapr-spring-boot-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DaprPubSubProperties {
* Name of the PubSub Dapr component.
*/
private String name;
private boolean observationEnabled;

public String getName() {
return name;
Expand All @@ -34,4 +35,11 @@ public void setName(String name) {
this.name = name;
}

public boolean isObservationEnabled() {
return observationEnabled;
}

public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}
}
2 changes: 1 addition & 1 deletion dapr-spring/dapr-spring-messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
<artifactId>dapr-spring-messaging</artifactId>
<name>dapr-spring-messaging</name>
<description>Dapr Spring Messaging</description>
<packaging>jar</packaging>
<packaging>jar</packaging>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,104 @@

import io.dapr.client.DaprClient;
import io.dapr.client.domain.Metadata;
import io.dapr.spring.messaging.observation.DaprMessagingObservationConvention;
import io.dapr.spring.messaging.observation.DaprMessagingObservationDocumentation;
import io.dapr.spring.messaging.observation.DaprMessagingSenderContext;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapSetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

public class DaprMessagingTemplate<T> implements DaprMessagingOperations<T> {
/**
* Create a new DaprMessagingTemplate.
* @param <T> templated message type
*/
public class DaprMessagingTemplate<T> implements DaprMessagingOperations<T>, ApplicationContextAware, BeanNameAware,
SmartInitializingSingleton {

private static final Logger LOGGER = LoggerFactory.getLogger(DaprMessagingTemplate.class);
private static final String MESSAGE_TTL_IN_SECONDS = "10";
private static final DaprMessagingObservationConvention DEFAULT_OBSERVATION_CONVENTION =
DaprMessagingObservationConvention.getDefault();

private final DaprClient daprClient;
private final String pubsubName;
private final Map<String, String> metadata;
private final boolean observationEnabled;

@Nullable
private ApplicationContext applicationContext;

@Nullable
private String beanName;

@Nullable
private OpenTelemetry openTelemetry;

@Nullable
private ObservationRegistry observationRegistry;

public DaprMessagingTemplate(DaprClient daprClient, String pubsubName) {
@Nullable
private DaprMessagingObservationConvention observationConvention;

/**
* Constructs a new DaprMessagingTemplate.
* @param daprClient Dapr client
* @param pubsubName pubsub name
* @param observationEnabled whether to enable observations
*/
public DaprMessagingTemplate(DaprClient daprClient, String pubsubName, boolean observationEnabled) {
this.daprClient = daprClient;
this.pubsubName = pubsubName;
this.metadata = Map.of(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS);
this.observationEnabled = observationEnabled;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

@Override
public void setBeanName(String beanName) {
this.beanName = beanName;
}

/**
* If observations are enabled, attempt to obtain the Observation registry and
* convention.
*/
@Override
public void afterSingletonsInstantiated() {
if (!observationEnabled) {
LOGGER.debug("Observations are not enabled - not recording");
return;
}

if (applicationContext == null) {
LOGGER.warn("Observations enabled but application context null - not recording");
return;
}

observationRegistry = applicationContext.getBeanProvider(ObservationRegistry.class)
.getIfUnique(() -> observationRegistry);
this.openTelemetry = this.applicationContext.getBeanProvider(OpenTelemetry.class)
.getIfUnique(() -> this.openTelemetry);
observationConvention = applicationContext.getBeanProvider(DaprMessagingObservationConvention.class)
.getIfUnique(() -> observationConvention);
}

@Override
Expand All @@ -38,29 +122,83 @@ public void send(String topic, T message) {

@Override
public SendMessageBuilder<T> newMessage(T message) {
return new SendMessageBuilderImpl<>(this, message);
return new DefaultSendMessageBuilder<>(this, message);
}

private void doSend(String topic, T message) {
doSendAsync(topic, message).block();
}

private Mono<Void> doSendAsync(String topic, T message) {
return daprClient.publishEvent(pubsubName,
topic,
message,
Map.of(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS));
LOGGER.trace("Sending message to '{}' topic", topic);

if (canUseObservation()) {
return publishEventWithObservation(pubsubName, topic, message);
}

return publishEvent(pubsubName, topic, message);
}

private boolean canUseObservation() {
return observationEnabled
&& observationRegistry != null
&& openTelemetry != null
&& beanName != null;
}

private Mono<Void> publishEvent(String pubsubName, String topic, T message) {
return daprClient.publishEvent(pubsubName, topic, message, metadata);
}

private Mono<Void> publishEventWithObservation(String pubsubName, String topic, T message) {
DaprMessagingSenderContext senderContext = DaprMessagingSenderContext.newContext(topic, this.beanName);
Observation observation = createObservation(senderContext);

return observation.observe(() ->
publishEvent(pubsubName, topic, message)
.contextWrite(getReactorContext())
.doOnError(err -> {
LOGGER.error("Failed to send msg to '{}' topic", topic, err);

observation.error(err);
observation.stop();
})
.doOnSuccess(ignore -> {
LOGGER.trace("Sent msg to '{}' topic", topic);

observation.stop();
})
);
}

private Context getReactorContext() {
Map<String, String> map = new HashMap<>();
TextMapSetter<Map<String, String>> setter = (carrier, key, value) -> map.put(key, value);
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();

openTelemetry.getPropagators().getTextMapPropagator().inject(otelContext, map, setter);

return Context.of(map);
}

private Observation createObservation(DaprMessagingSenderContext senderContext) {
return DaprMessagingObservationDocumentation.TEMPLATE_OBSERVATION.observation(
observationConvention,
DEFAULT_OBSERVATION_CONVENTION,
() -> senderContext,
observationRegistry
);
}

private static class SendMessageBuilderImpl<T> implements SendMessageBuilder<T> {
private static class DefaultSendMessageBuilder<T> implements SendMessageBuilder<T> {

private final DaprMessagingTemplate<T> template;

private final T message;

private String topic;

SendMessageBuilderImpl(DaprMessagingTemplate<T> template, T message) {
DefaultSendMessageBuilder(DaprMessagingTemplate<T> template, T message) {
this.template = template;
this.message = message;
}
Expand All @@ -74,12 +212,12 @@ public SendMessageBuilder<T> withTopic(String topic) {

@Override
public void send() {
this.template.doSend(this.topic, this.message);
template.doSend(topic, message);
}

@Override
public Mono<Void> sendAsync() {
return this.template.doSendAsync(this.topic, this.message);
return template.doSendAsync(topic, message);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.spring.messaging.observation;

import io.micrometer.observation.Observation.Context;
import io.micrometer.observation.ObservationConvention;

/**
* {@link ObservationConvention} for Dapr Messaging.
*
*/
public interface DaprMessagingObservationConvention extends ObservationConvention<DaprMessagingSenderContext> {

@Override
default boolean supportsContext(Context context) {
return context instanceof DaprMessagingSenderContext;
}

@Override
default String getName() {
return "spring.dapr.messaging.template";
}

static DaprMessagingObservationConvention getDefault() {
return DefaultDaprMessagingObservationConvention.INSTANCE;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.spring.messaging.observation;

import io.micrometer.common.docs.KeyName;
import io.micrometer.observation.Observation;
import io.micrometer.observation.Observation.Context;
import io.micrometer.observation.ObservationConvention;
import io.micrometer.observation.docs.ObservationDocumentation;

/**
* An {@link Observation} for {@link io.dapr.spring.messaging.DaprMessagingTemplate}.
*
*/
public enum DaprMessagingObservationDocumentation implements ObservationDocumentation {

/**
* Observation created when a Dapr template sends a message.
*/
TEMPLATE_OBSERVATION {

@Override
public Class<? extends ObservationConvention<? extends Context>> getDefaultConvention() {
return DefaultDaprMessagingObservationConvention.class;
}

@Override
public String getPrefix() {
return "spring.dapr.messaging.template";
}

@Override
public KeyName[] getLowCardinalityKeyNames() {
return TemplateLowCardinalityTags.values();
}
};

/**
* Low cardinality tags.
*/
public enum TemplateLowCardinalityTags implements KeyName {
/**
* Bean name of the template that sent the message.
*/
BEAN_NAME {

@Override
public String asString() {
return "spring.dapr.messaging.template.name";
}
}
}
}
Loading

0 comments on commit be05a47

Please sign in to comment.