Skip to content

Commit

Permalink
latest
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Jun 20, 2024
1 parent f62c4be commit 20b5d25
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Flux<PartitionEvent> syncReceive(Flux<PartitionEvent> events, String part
startOptions.setStartTimestamp(Instant.now());
}

return createScope((m, s) -> meter.reportReceiveDuration(receivedCount[0], partitionId, s));
return createScope((m, s) -> meter.reportReceive(receivedCount[0], partitionId, s));
},
scope -> events
.doOnNext(partitionEvent -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.messaging.eventhubs.implementation.instrumentation;

import com.azure.core.util.TelemetryAttributes;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.DoubleHistogram;
import com.azure.core.util.metrics.LongCounter;
import com.azure.core.util.metrics.Meter;
Expand All @@ -19,16 +20,18 @@
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_CLIENT_CONSUMED_MESSAGES;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_CLIENT_OPERATION_DURATION;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_CLIENT_PUBLISHED_MESSAGES;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_CONSUMER_PROCESS_DURATION;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_DESTINATION_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_CONSUMER_GROUP_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_EVENTHUBS_CONSUMER_LAG;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_DESTINATION_PARTITION_ID;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_TYPE;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_PROCESS_DURATION;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_SYSTEM;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_SYSTEM_VALUE;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.SERVER_ADDRESS;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.getDurationInSeconds;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.getOperationType;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.CHECKPOINT;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.PROCESS;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.RECEIVE;
Expand All @@ -37,6 +40,7 @@
public class EventHubsMetricsProvider {
private final Meter meter;
private final boolean isEnabled;
private static final ClientLogger LOGGER = new ClientLogger(EventHubsMetricsProvider.class);
private Map<String, Object> commonAttributes;
private AttributeCache sendAttributeCacheSuccess;
private AttributeCache receiveAttributeCacheSuccess;
Expand All @@ -53,31 +57,22 @@ public EventHubsMetricsProvider(Meter meter, String namespace, String entityName
this.isEnabled = meter != null && meter.isEnabled();
if (this.isEnabled) {
this.commonAttributes = getCommonAttributes(namespace, entityName, consumerGroup);
this.sendAttributeCacheSuccess = createAttributeCache(meter, SEND, commonAttributes);
this.receiveAttributeCacheSuccess = createAttributeCache(meter, RECEIVE, commonAttributes);
this.checkpointAttributeCacheSuccess = createAttributeCache(meter, CHECKPOINT, commonAttributes);
this.processAttributeCacheSuccess = createAttributeCache(meter, PROCESS, commonAttributes);
this.sendAttributeCacheSuccess = AttributeCache.create(meter, SEND, commonAttributes);
this.receiveAttributeCacheSuccess = AttributeCache.create(meter, RECEIVE, commonAttributes);
this.checkpointAttributeCacheSuccess = AttributeCache.create(meter, CHECKPOINT, commonAttributes);
this.processAttributeCacheSuccess = AttributeCache.create(meter, PROCESS, commonAttributes);
this.lagAttributeCache = new AttributeCache(meter, MESSAGING_DESTINATION_PARTITION_ID, commonAttributes);

this.publishedEventCounter = meter.createLongCounter(MESSAGING_CLIENT_PUBLISHED_MESSAGES, "The number of published events", "{event}");
this.consumedEventCounter = meter.createLongCounter(MESSAGING_CLIENT_CONSUMED_MESSAGES, "The number of consumed events", "{event}");

this.operationDuration = meter.createDoubleHistogram(MESSAGING_CLIENT_OPERATION_DURATION, "The duration of client messaging operations involving communication with the Event Hubs namespace", "s");
this.processDuration = meter.createDoubleHistogram(MESSAGING_CONSUMER_PROCESS_DURATION, "The duration of the processing callback", "s");
this.processDuration = meter.createDoubleHistogram(MESSAGING_PROCESS_DURATION, "The duration of the processing callback", "s");

this.consumerLag = meter.createDoubleHistogram(MESSAGING_EVENTHUBS_CONSUMER_LAG, "Difference between local time when event was received and the local time it was enqueued on broker", "s");
}
}

private static AttributeCache createAttributeCache(
Meter meter,
OperationName operationName,
Map<String, Object> commonAttributes) {
Map<String, Object> attributes = new HashMap<>(commonAttributes);
attributes.put(MESSAGING_OPERATION_NAME, operationName.toString());
return new AttributeCache(meter, MESSAGING_DESTINATION_PARTITION_ID, attributes);
}

public boolean isEnabled() {
return isEnabled;
}
Expand All @@ -98,7 +93,7 @@ public void reportProcess(int batchSize, String partitionId, InstrumentationScop
}
}

public void reportReceiveDuration(int receivedCount, String partitionId, InstrumentationScope scope) {
public void reportReceive(int receivedCount, String partitionId, InstrumentationScope scope) {
if (isEnabled && (operationDuration.isEnabled() || consumedEventCounter.isEnabled())) {
String errorType = scope.getErrorType();
TelemetryAttributes attributes = getOrCreateAttributes(RECEIVE, partitionId, errorType);
Expand Down Expand Up @@ -136,6 +131,12 @@ private TelemetryAttributes getOrCreateAttributes(OperationName operationName, S
return checkpointAttributeCacheSuccess.getOrCreate(partitionId);
case PROCESS:
return processAttributeCacheSuccess.getOrCreate(partitionId);
default:
LOGGER.atVerbose()
.addKeyValue("operationName", operationName)
.log("Unknown operation name");
// this should never happen
return lagAttributeCache.getOrCreate(partitionId);
}
}

Expand All @@ -145,7 +146,8 @@ private TelemetryAttributes getOrCreateAttributes(OperationName operationName, S
if (partitionId != null) {
attributes.put(MESSAGING_DESTINATION_PARTITION_ID, partitionId);
}
attributes.put(MESSAGING_OPERATION_NAME, operationName.toString());

setOperation(attributes, operationName);
attributes.put(ERROR_TYPE, errorType);
return meter.createAttributes(attributes);
}
Expand All @@ -162,14 +164,29 @@ private Map<String, Object> getCommonAttributes(String namespace, String entityN
return Collections.unmodifiableMap(commonAttributesMap);
}

static class AttributeCache {
private static void setOperation(Map<String, Object> attributes, OperationName name) {
String operationType = getOperationType(name);
if (operationType != null) {
attributes.put(MESSAGING_OPERATION_TYPE, operationType);
}

attributes.put(MESSAGING_OPERATION_NAME, name.toString());
}

private static final class AttributeCache {
private final Map<String, TelemetryAttributes> attr = new ConcurrentHashMap<>();
private final TelemetryAttributes commonAttr;
private final Map<String, Object> commonMap;
private final String dimensionName;
private final Meter meter;

AttributeCache(Meter meter, String dimensionName, Map<String, Object> common) {
static AttributeCache create(Meter meter, OperationName operationName, Map<String, Object> commonAttributes) {
Map<String, Object> attributes = new HashMap<>(commonAttributes);
setOperation(attributes, operationName);
return new AttributeCache(meter, MESSAGING_DESTINATION_PARTITION_ID, attributes);
}

private AttributeCache(Meter meter, String dimensionName, Map<String, Object> common) {
this.dimensionName = dimensionName;
this.commonMap = common;
this.meter = meter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_DESTINATION_PARTITION_ID;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_EVENTHUBS_MESSAGE_ENQUEUED_TIME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_TYPE;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_SYSTEM;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_SYSTEM_VALUE;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.SERVER_ADDRESS;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.TRACEPARENT_KEY;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.getErrorType;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.getOperationType;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.unwrap;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.EVENT;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.PROCESS;
Expand Down Expand Up @@ -236,6 +238,11 @@ public StartSpanOptions createStartOptions(SpanKind kind, OperationName operatio
startOptions.setAttribute(MESSAGING_DESTINATION_PARTITION_ID, partitionId);
}

String operationType = getOperationType(operationName);
if (operationType != null) {
startOptions.setAttribute(MESSAGING_OPERATION_TYPE, operationType);
}

return startOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public InstrumentationScope setSpan(Context span) {
}

public InstrumentationScope setCancelled() {
// Complicated calls can result in can result in error followed by cancellation. We shouldn't track them twice.
// Complicated calls can result in error followed by cancellation. We shouldn't track them twice.
// don't trust me? try this:
// Flux.fromIterable(Collections.singletonList("event"))
// .flatMap(event -> Mono.error(new RuntimeException("boom"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class InstrumentationUtils {
public static final String MESSAGING_DESTINATION_NAME = "messaging.destination.name";
public static final String MESSAGING_DESTINATION_PARTITION_ID = "messaging.destination.partition.id";
public static final String MESSAGING_OPERATION_NAME = "messaging.operation.name";
public static final String MESSAGING_OPERATION_TYPE = "messaging.operation.type";
public static final String MESSAGING_SYSTEM = "messaging.system";
public static final String MESSAGING_CONSUMER_GROUP_NAME = "messaging.consumer.group.name";
public static final String MESSAGING_EVENTHUBS_MESSAGE_ENQUEUED_TIME = "messaging.eventhubs.message.enqueued_time";
Expand All @@ -27,7 +28,7 @@ public final class InstrumentationUtils {
public static final String MESSAGING_CLIENT_PUBLISHED_MESSAGES = "messaging.client.published.messages";
public static final String MESSAGING_CLIENT_CONSUMED_MESSAGES = "messaging.client.consumed.messages";
public static final String MESSAGING_CLIENT_OPERATION_DURATION = "messaging.client.operation.duration";
public static final String MESSAGING_CONSUMER_PROCESS_DURATION = "messaging.consumer.process.duration";
public static final String MESSAGING_PROCESS_DURATION = "messaging.process.duration";

// custom metrics
public static final String MESSAGING_EVENTHUBS_CONSUMER_LAG = "messaging.eventhubs.consumer.lag";
Expand Down Expand Up @@ -85,6 +86,21 @@ public static double getDurationInSeconds(Instant startTime) {
return durationNanos / 1_000_000_000d;
}

public static String getOperationType(OperationName name) {
switch (name) {
case SEND:
return "publish";
case RECEIVE:
return "receive";
case CHECKPOINT:
return "settle";
case PROCESS:
return "process";
default:
return null;
}
}

private InstrumentationUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ public enum OperationName {
SEND("send"),
RECEIVE("receive"),
CHECKPOINT("checkpoint"),

// TODO (limolkova) we should document and standardize EventHubs operation names across languages
// https://github.com/open-telemetry/semantic-conventions/issues/750
GET_EVENT_HUB_PROPERTIES("get_event_hub_properties"),
GET_PARTITION_PROPERTIES("get_partition_properties");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@
import static com.azure.messaging.eventhubs.TestUtils.getSpanName;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.DIAGNOSTIC_ID_KEY;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_TYPE;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.TRACEPARENT_KEY;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.getOperationType;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.EVENT;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.GET_EVENT_HUB_PROPERTIES;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.GET_PARTITION_PROPERTIES;
Expand Down Expand Up @@ -1585,7 +1587,8 @@ private void assertStartOptions(
assertEquals(kind, startOpts.getSpanKind());
assertAllAttributes(HOSTNAME, EVENT_HUB_NAME, partitionId, null, null, operationName,
startOpts.getAttributes());
assertNotNull(startOpts.getAttributes().get(MESSAGING_OPERATION_NAME));
assertEquals(operationName.toString(), startOpts.getAttributes().get(MESSAGING_OPERATION_NAME));
assertEquals(getOperationType(operationName), startOpts.getAttributes().get(MESSAGING_OPERATION_TYPE));

if (linkCount == 0) {
assertNull(startOpts.getLinks());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public void testTracingMetricsEmptyBatch() throws Exception {
//Assert
verify(tracer, never()).start(anyString(), any(), any(Context.class));
assertEquals(0, meter.getCounters().get("messaging.client.consumed.messages").getMeasurements().size());
assertEquals(0, meter.getHistograms().get("messaging.consumer.process.duration").getMeasurements().size());
assertEquals(0, meter.getHistograms().get("messaging.process.duration").getMeasurements().size());
}

/**
Expand Down Expand Up @@ -1095,7 +1095,7 @@ private static void assertProcessMetrics(TestMeter meter, int batchSize, String
assertAllAttributes(HOSTNAME, EVENT_HUB_NAME, PARTITION_ID, CONSUMER_GROUP, expectedErrorType,
PROCESS, eventCounter.getMeasurements().get(0).getAttributes());

TestHistogram processDuration = meter.getHistograms().get("messaging.consumer.process.duration");
TestHistogram processDuration = meter.getHistograms().get("messaging.process.duration");
assertNotNull(processDuration);
assertEquals(1, processDuration.getMeasurements().size());
assertNotNull(processDuration.getMeasurements().get(0).getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_CONSUMER_GROUP_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_DESTINATION_PARTITION_ID;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_NAME;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_OPERATION_TYPE;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_SYSTEM;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_SYSTEM_VALUE;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.SERVER_ADDRESS;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.getOperationType;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

/**
* Contains helper methods for working with AMQP messages
Expand Down Expand Up @@ -209,7 +212,13 @@ public static void assertAllAttributes(String hostname, String entityName, Strin
assertEquals(entityName, attributes.get(MESSAGING_DESTINATION_NAME));
assertEquals(partitionId, attributes.get(MESSAGING_DESTINATION_PARTITION_ID));
assertEquals(consumerGroup, attributes.get(MESSAGING_CONSUMER_GROUP_NAME));
assertEquals(operationName == null ? null : operationName.toString(), attributes.get(MESSAGING_OPERATION_NAME));
if (operationName == null) {
assertNull(attributes.get(MESSAGING_OPERATION_NAME));
assertNull(attributes.get(MESSAGING_OPERATION_TYPE));
} else {
assertEquals(operationName.toString(), attributes.get(MESSAGING_OPERATION_NAME));
assertEquals(getOperationType(operationName), attributes.get(MESSAGING_OPERATION_TYPE));
}
assertEquals(errorType, attributes.get(ERROR_TYPE));
}

Expand Down
Loading

0 comments on commit 20b5d25

Please sign in to comment.