Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Sep 19, 2022
1 parent ca7f97c commit 9e13f75
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ class OpenTelemetryAttributes implements TelemetryAttributes {
private static Map<String, String> getMappings() {
Map<String, String> mappings = new HashMap<>();
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants
mappings.put("status", "otel.status_code");
mappings.put("entityName", "messaging.destination");
mappings.put("entityPath", "messaging.az.entity_path");
mappings.put("hostName", "net.peer.name");
mappings.put("errorCondition", "amqp.error_condition");
mappings.put("amqpStatusCode", "amqp.status_code");
mappings.put("amqpOperation", "amqp.operation");
mappings.put("deliveryState", "amqp.delivery_state");
mappings.put("partitionId", "messaging.az.partition_id");
mappings.put("consumerGroup", "messaging.eventhubs.consumer_group");

return Collections.unmodifiableMap(mappings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ public void attributeMappings() {
put("deliveryState", "rejected");
put("amqpStatusCode", "no_content");
put("amqpOperation", "peek");
put("partitionId", 42);
put("status", "error");
put("consumerGroup", "$Default");
}});

assertEquals(OpenTelemetryAttributes.class, attributeCollection.getClass());
Attributes attributes = ((OpenTelemetryAttributes) attributeCollection).get();

assertEquals(8, attributes.size());
assertEquals(11, attributes.size());
assertEquals("value", attributes.get(AttributeKey.stringKey("foobar")));
assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name")));
assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination")));
Expand All @@ -83,6 +86,9 @@ public void attributeMappings() {
assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state")));
assertEquals("peek", attributes.get(AttributeKey.stringKey("amqp.operation")));
assertEquals("no_content", attributes.get(AttributeKey.stringKey("amqp.status_code")));
assertEquals(42, attributes.get(AttributeKey.longKey("messaging.az.partition_id")));
assertEquals("error", attributes.get(AttributeKey.stringKey("otel.status_code")));
assertEquals("$Default", attributes.get(AttributeKey.stringKey("messaging.eventhubs.consumer_group")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@

class MetricsHelper {
private static final ClientLogger LOGGER = new ClientLogger(MetricsHelper.class);
public static final String ENTITY_NAME_KEY = "entityName";
public static final String HOSTNAME_KEY = "hostName";
public static final String PARTITION_ID_KEY = "partitionId";
public static final String CONSUMER_GROUP_KEY = "consumerGroup";

// Make sure attribute names are consistent across AMQP Core, EventHubs, ServiceBus when applicable
// and mapped correctly in OTel Metrics https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java
private static final String ENTITY_NAME_KEY = "entityName";
private static final String HOSTNAME_KEY = "hostName";
private static final String PARTITION_ID_KEY = "partitionId";
private static final String CONSUMER_GROUP_KEY = "consumerGroup";
private static final String STATUS_KEY = "status";

// since checkpoint store is stateless it might be used for endless number of eventhubs.
// we'll have as many subscriptions as there are combinations of fqdn, eventhub name, partitionId and consumer group.
Expand Down Expand Up @@ -67,7 +71,7 @@ class MetricsHelper {
this.isEnabled = meter != null && meter.isEnabled();
if (isEnabled) {
this.lastSequenceNumber = this.meter.createLongGauge("messaging.eventhubs.checkpoint.sequence_number", "Last successfully checkpointed sequence number.", "seqNo");
this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoint", "Number of checkpoints.", null);
this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoints", "Number of checkpoints.", null);
} else {
this.lastSequenceNumber = null;
this.checkpointCounter = null;
Expand Down Expand Up @@ -124,7 +128,7 @@ private Map<String, Object> createAttributes(Checkpoint checkpoint, String statu
attributesMap.put(PARTITION_ID_KEY, checkpoint.getPartitionId());
attributesMap.put(CONSUMER_GROUP_KEY, checkpoint.getConsumerGroup());
if (status != null) {
attributesMap.put("status", status);
attributesMap.put(STATUS_KEY, status);
}

return attributesMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public void testUpdateEnabledMetrics() {
assertEquals(2L, seqNoMeasurement.getValue());
assertCommonAttributes(checkpoint, seqNoMeasurement.getAttributes());

assertTrue(meter.getCounters().containsKey("messaging.eventhubs.checkpoint"));
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint");
assertTrue(meter.getCounters().containsKey("messaging.eventhubs.checkpoints"));
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
assertEquals(1, checkpoints.getMeasurements().size());
TestMeasurement<Long> checkpointMeasurements = checkpoints.getMeasurements().get(0);
assertEquals(1, checkpointMeasurements.getValue());
Expand Down Expand Up @@ -202,7 +202,7 @@ public void testUpdateEnabledMetricsFailure() {
// sequence number is only reported for successfull checkpoints
assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size());

TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint");
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
TestMeasurement<Long> checkpointMeasurements = checkpoints.getMeasurements().get(0);
assertEquals(1, checkpointMeasurements.getValue());
assertStatusAttributes(checkpoint, "error", checkpointMeasurements.getAttributes());
Expand All @@ -225,7 +225,7 @@ public void testUpdateEnabledMetricsNullSeqNo() {

assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size());

TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint");
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
TestMeasurement<Long> checkpointMeasurements = checkpoints.getMeasurements().get(0);
assertEquals(1, checkpointMeasurements.getValue());
assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes());
Expand Down Expand Up @@ -265,7 +265,7 @@ public void testUpdateEnabledMetricsTooManyAttributes() {
i[0]++;
});

TestCounter checkpointCounter = meter.getCounters().get("messaging.eventhubs.checkpoint");
TestCounter checkpointCounter = meter.getCounters().get("messaging.eventhubs.checkpoints");
assertEquals(MAX_ATTRIBUTES_SETS, checkpointCounter.getMeasurements().size());

final int[] j = {0};
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testUpdateEnabledMetricsMultipleMeasurements() {
TestMeasurement<Long> seqNoMeasurement = subs.getMeasurements().get(0);
assertEquals(42L, seqNoMeasurement.getValue());

TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint");
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
assertEquals(2, checkpoints.getMeasurements().size());

assertEquals(1, checkpoints.getMeasurements().get(0).getValue());
Expand Down Expand Up @@ -363,7 +363,7 @@ public void testUpdateEnabledMetricsMultipleHubs() {
assertEquals(42L, seqNoMeasurement2.getValue());
assertCommonAttributes(checkpoint2, seqNoMeasurement2.getAttributes());

TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint");
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
assertEquals(2, checkpoints.getMeasurements().size());

assertEquals(1, checkpoints.getMeasurements().get(0).getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public class EventHubClientBuilder implements
LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN);
LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN);
}

/**
* Keeps track of the open clients that were created from this builder when there is a shared connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ public Mono<EventDataBatch> createBatch(CreateBatchOptions options) {
MAX_PARTITION_KEY_LENGTH)));
}

final String entityPath = getEntityPath(partitionId);

return getSendLink(entityPath)
return getSendLink(partitionId)
.flatMap(link -> link.getLinkSize()
.flatMap(size -> {
final int maximumLinkSize = size > 0
Expand Down Expand Up @@ -581,8 +579,7 @@ public Mono<Void> send(EventDataBatch batch) {
parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, sharedContext, ProcessKind.SEND));
}

final String entityPath = getEntityPath(batch.getPartitionId());
final Mono<Void> sendMessage = getSendLink(entityPath)
final Mono<Void> sendMessage = getSendLink(batch.getPartitionId())
.flatMap(link -> messages.size() == 1
? link.send(messages.get(0))
: link.send(messages));
Expand All @@ -592,10 +589,10 @@ public Mono<Void> send(EventDataBatch batch) {
.publishOn(scheduler)
.doOnEach(signal -> {
Context context = isTracingEnabled ? parentContext.get() : Context.NONE;
metricsProvider.reportBatchSend(batch, batch.getPartitionId(), signal.getThrowable(), context);
if (isTracingEnabled) {
tracerProvider.endSpan(context, signal);
}
metricsProvider.reportBatchSend(batch, batch.getPartitionId(), signal.getThrowable(), context);
});
}

Expand All @@ -611,8 +608,7 @@ private Mono<Void> sendInternal(Flux<EventData> events, SendOptions options) {
partitionKey, partitionId)));
}

final String entityPath = getEntityPath(options.getPartitionId());
return getSendLink(entityPath)
return getSendLink(options.getPartitionId())
.flatMap(link -> link.getLinkSize()
.flatMap(size -> {
final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
Expand Down Expand Up @@ -641,7 +637,8 @@ private String getEntityPath(String partitionId) {
: String.format(Locale.US, SENDER_ENTITY_PATH_FORMAT, eventHubName, partitionId);
}

private Mono<AmqpSendLink> getSendLink(String entityPath) {
private Mono<AmqpSendLink> getSendLink(String partitionId) {
final String entityPath = getEntityPath(partitionId);
final String linkName = entityPath;

return connectionProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import java.util.concurrent.ConcurrentHashMap;

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_NAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONSUMER_GROUP_KEY;

public class EventHubsMetricsProvider {
private static final String PARTITION_ID_KEY = "partitionId";
private static final String SEND_STATUS_KEY = "status";
private final Meter meter;
private final boolean isEnabled;

Expand All @@ -34,18 +37,18 @@ public EventHubsMetricsProvider(Meter meter, String namespace, String entityName
this.isEnabled = meter != null && meter.isEnabled();
if (this.isEnabled) {
Map<String, Object> commonAttributesMap = new HashMap<>();
commonAttributesMap.put("hostName", namespace);
commonAttributesMap.put(HOSTNAME_KEY, namespace);
commonAttributesMap.put(ENTITY_NAME_KEY, entityName);
if (consumerGroup != null) {
commonAttributesMap.put("consumerGroup", consumerGroup);
commonAttributesMap.put(CONSUMER_GROUP_KEY, consumerGroup);
}

Map<String, Object> successMap = new HashMap<>(commonAttributesMap);
successMap.put("status", "ok");
successMap.put(SEND_STATUS_KEY, "ok");
this.sendAttributeCacheSuccess = new AttributeCache(PARTITION_ID_KEY, successMap);

Map<String, Object> failureMap = new HashMap<>(commonAttributesMap);
failureMap.put("status", "error");
failureMap.put(SEND_STATUS_KEY, "error");
this.sendAttributeCacheFailure = new AttributeCache(PARTITION_ID_KEY, failureMap);

this.receiveAttributeCache = new AttributeCache(PARTITION_ID_KEY, commonAttributesMap);
Expand Down

0 comments on commit 9e13f75

Please sign in to comment.