diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java index 5c5d0a32280f7..969e62671f88c 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java @@ -21,6 +21,7 @@ class OpenTelemetryAttributes implements TelemetryAttributes { private static Map getMappings() { Map mappings = new HashMap<>(); // messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants + mappings.put("status", "otel.status_code"); mappings.put("entityName", "messaging.destination"); mappings.put("entityPath", "messaging.az.entity_path"); mappings.put("hostName", "net.peer.name"); @@ -28,6 +29,8 @@ private static Map getMappings() { mappings.put("amqpStatusCode", "amqp.status_code"); mappings.put("amqpOperation", "amqp.operation"); mappings.put("deliveryState", "amqp.delivery_state"); + mappings.put("partitionId", "messaging.eventhubs.partition_id"); + mappings.put("consumerGroup", "messaging.eventhubs.consumer_group"); return Collections.unmodifiableMap(mappings); } diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java index 35bccbca890d8..87520a083bc64 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java @@ -69,12 +69,15 @@ public void attributeMappings() { put("deliveryState", "rejected"); put("amqpStatusCode", "no_content"); put("amqpOperation", "peek"); + put("partitionId", 42); + put("status", "error"); + put("consumerGroup", "$Default"); }}); assertEquals(OpenTelemetryAttributes.class, attributeCollection.getClass()); Attributes attributes = ((OpenTelemetryAttributes) attributeCollection).get(); - assertEquals(8, attributes.size()); + assertEquals(11, attributes.size()); assertEquals("value", attributes.get(AttributeKey.stringKey("foobar"))); assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name"))); assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination"))); @@ -83,6 +86,9 @@ public void attributeMappings() { assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state"))); assertEquals("peek", attributes.get(AttributeKey.stringKey("amqp.operation"))); assertEquals("no_content", attributes.get(AttributeKey.stringKey("amqp.status_code"))); + assertEquals(42, attributes.get(AttributeKey.longKey("messaging.eventhubs.partition_id"))); + assertEquals("error", attributes.get(AttributeKey.stringKey("otel.status_code"))); + assertEquals("$Default", attributes.get(AttributeKey.stringKey("messaging.eventhubs.consumer_group"))); } @Test diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml index 0bd716a1421e7..52a1424c186c9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml @@ -58,6 +58,12 @@ + + com.azure + azure-core-test + 1.12.0 + test + com.azure azure-identity diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java index 88d933c67e949..ea9d10685f26d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java @@ -4,30 +4,32 @@ package com.azure.messaging.eventhubs.checkpointstore.blob; import com.azure.core.http.rest.Response; +import com.azure.core.util.ClientOptions; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.MeterProvider; import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.models.Checkpoint; import com.azure.messaging.eventhubs.models.PartitionOwnership; import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.BlobContainerAsyncClient; -import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobItem; -import com.azure.storage.blob.models.BlobListDetails; import com.azure.storage.blob.models.BlobItemProperties; +import com.azure.storage.blob.models.BlobListDetails; +import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.ListBlobsOptions; -import java.util.List; -import java.util.Objects; -import java.util.function.Function; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; @@ -66,6 +68,7 @@ public class BlobCheckpointStore implements CheckpointStore { private static final ClientLogger LOGGER = new ClientLogger(BlobCheckpointStore.class); private final BlobContainerAsyncClient blobContainerAsyncClient; + private final MetricsHelper metricsHelper; private final Map blobClients = new ConcurrentHashMap<>(); /** @@ -75,7 +78,20 @@ public class BlobCheckpointStore implements CheckpointStore { * blobs in the storage container. */ public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient) { + this(blobContainerAsyncClient, null); + } + + + /** + * Creates an instance of BlobCheckpointStore. + * + * @param blobContainerAsyncClient The {@link BlobContainerAsyncClient} this instance will use to read and update + * @param options The {@link ClientOptions} to configure this instance. + * blobs in the storage container. + */ + public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient, ClientOptions options) { this.blobContainerAsyncClient = blobContainerAsyncClient; + this.metricsHelper = new MetricsHelper(options == null ? null : options.getMetricsOptions(), MeterProvider.getDefaultProvider()); } /** @@ -256,6 +272,11 @@ public Mono updateCheckpoint(Checkpoint checkpoint) { return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null, null).then(); } + }) + .doOnEach(signal -> { + if (signal.isOnComplete() || signal.isOnError()) { + metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError()); + } }); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java new file mode 100644 index 0000000000000..c0adb39bdacdd --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java @@ -0,0 +1,188 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.checkpointstore.blob; + +import com.azure.core.util.Context; +import com.azure.core.util.CoreUtils; +import com.azure.core.util.MetricsOptions; +import com.azure.core.util.TelemetryAttributes; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.LongCounter; +import com.azure.core.util.metrics.LongGauge; +import com.azure.core.util.metrics.Meter; +import com.azure.core.util.metrics.MeterProvider; +import com.azure.messaging.eventhubs.models.Checkpoint; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +final class MetricsHelper { + private static final ClientLogger LOGGER = new ClientLogger(MetricsHelper.class); + + // Make sure attribute names are consistent across AMQP Core, EventHubs, ServiceBus when applicable + // and mapped correctly in OTel Metrics https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java + private static final String ENTITY_NAME_KEY = "entityName"; + private static final String HOSTNAME_KEY = "hostName"; + private static final String PARTITION_ID_KEY = "partitionId"; + private static final String CONSUMER_GROUP_KEY = "consumerGroup"; + private static final String STATUS_KEY = "status"; + + // since checkpoint store is stateless it might be used for endless number of eventhubs. + // we'll have as many subscriptions as there are combinations of fqdn, eventhub name, partitionId and consumer group. + // In the unlikely case it's shared across a lot of EH client instances, metrics would be too costly + // and unhelpful. So, let's just set a hard limit on number of subscriptions. + private static final int MAX_ATTRIBUTES_SETS = 100; + + private static final String PROPERTIES_FILE = "azure-messaging-eventhubs-checkpointstore-blob.properties"; + private static final String NAME_KEY = "name"; + private static final String VERSION_KEY = "version"; + private static final String LIBRARY_NAME; + private static final String LIBRARY_VERSION; + private static final String UNKNOWN = "UNKNOWN"; + + static { + final Map properties = CoreUtils.getProperties(PROPERTIES_FILE); + LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN); + LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN); + } + + private final ConcurrentHashMap common = new ConcurrentHashMap<>(); + private final ConcurrentHashMap checkpointFailure = new ConcurrentHashMap<>(); + private final ConcurrentHashMap checkpointSuccess = new ConcurrentHashMap<>(); + private final ConcurrentHashMap seqNoSubscriptions = new ConcurrentHashMap<>(); + + private volatile boolean maxCapacityReached = false; + + private final Meter meter; + private final LongGauge lastSequenceNumber; + private final LongCounter checkpointCounter; + private final boolean isEnabled; + + MetricsHelper(MetricsOptions metricsOptions, MeterProvider meterProvider) { + if (areMetricsEnabled(metricsOptions)) { + this.meter = meterProvider.createMeter(LIBRARY_NAME, LIBRARY_VERSION, metricsOptions); + this.isEnabled = this.meter.isEnabled(); + } else { + this.meter = null; + this.isEnabled = false; + } + + if (isEnabled) { + this.lastSequenceNumber = this.meter.createLongGauge("messaging.eventhubs.checkpoint.sequence_number", "Last successfully checkpointed sequence number.", "seqNo"); + this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoints", "Number of checkpoints.", null); + } else { + this.lastSequenceNumber = null; + this.checkpointCounter = null; + } + } + + void reportCheckpoint(Checkpoint checkpoint, String attributesId, boolean success) { + if (!isEnabled || !(lastSequenceNumber.isEnabled() && checkpointCounter.isEnabled())) { + return; + } + + if (!maxCapacityReached && (seqNoSubscriptions.size() >= MAX_ATTRIBUTES_SETS || common.size() >= MAX_ATTRIBUTES_SETS)) { + LOGGER.error("Too many attribute combinations are reported for checkpoint metrics, ignoring any new dimensions."); + maxCapacityReached = true; + } + + if (lastSequenceNumber.isEnabled() && success) { + updateCurrentValue(attributesId, checkpoint); + } + + if (checkpointCounter.isEnabled()) { + TelemetryAttributes attributes = null; + if (success) { + attributes = getOrCreate(checkpointSuccess, attributesId, checkpoint, "ok"); + } else { + attributes = getOrCreate(checkpointFailure, attributesId, checkpoint, "error"); + } + if (attributes != null) { + checkpointCounter.add(1, attributes, Context.NONE); + } + } + } + + private TelemetryAttributes getOrCreate(ConcurrentHashMap source, String attributesId, Checkpoint checkpoint, String status) { + if (maxCapacityReached) { + return source.get(attributesId); + } + + return source.computeIfAbsent(attributesId, i -> meter.createAttributes(createAttributes(checkpoint, status))); + } + + private Map createAttributes(Checkpoint checkpoint, String status) { + Map attributesMap = new HashMap<>(5); + attributesMap.put(HOSTNAME_KEY, checkpoint.getFullyQualifiedNamespace()); + attributesMap.put(ENTITY_NAME_KEY, checkpoint.getEventHubName()); + attributesMap.put(PARTITION_ID_KEY, checkpoint.getPartitionId()); + attributesMap.put(CONSUMER_GROUP_KEY, checkpoint.getConsumerGroup()); + if (status != null) { + attributesMap.put(STATUS_KEY, status); + } + + return attributesMap; + } + + private void updateCurrentValue(String attributesId, Checkpoint checkpoint) { + if (checkpoint.getSequenceNumber() == null) { + return; + } + + final CurrentValue valueSupplier; + if (maxCapacityReached) { + valueSupplier = seqNoSubscriptions.get(attributesId); + if (valueSupplier == null) { + return; + } + } else { + TelemetryAttributes attributes = getOrCreate(common, attributesId, checkpoint, null); + if (attributes == null) { + return; + } + + valueSupplier = seqNoSubscriptions.computeIfAbsent(attributesId, a -> { + AtomicReference lastSeqNo = new AtomicReference<>(); + return new CurrentValue(lastSequenceNumber.registerCallback(() -> lastSeqNo.get(), attributes), lastSeqNo); + }); + } + + valueSupplier.set(checkpoint.getSequenceNumber()); + } + + private static boolean areMetricsEnabled(MetricsOptions options) { + if (options == null || options.isEnabled()) { + return true; + } + + return false; + } + + private static class CurrentValue { + private final AtomicReference lastSeqNo; + private final AutoCloseable subscription; + + CurrentValue(AutoCloseable subscription, AtomicReference lastSeqNo) { + this.subscription = subscription; + this.lastSeqNo = lastSeqNo; + } + + void set(long value) { + lastSeqNo.set(value); + } + + void close() { + if (subscription != null) { + try { + subscription.close(); + } catch (Exception e) { + // should never happen + throw LOGGER.logThrowableAsWarning(new RuntimeException(e)); + } + } + } + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/azure-messaging-eventhubs-checkpointstore-blob.properties b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/azure-messaging-eventhubs-checkpointstore-blob.properties new file mode 100644 index 0000000000000..ca812989b4f27 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/azure-messaging-eventhubs-checkpointstore-blob.properties @@ -0,0 +1,2 @@ +name=${project.artifactId} +version=${project.version} diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java index 30a6edde88b02..d511e855dc7b1 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java @@ -17,11 +17,13 @@ import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobItemProperties; import com.azure.storage.blob.models.ListBlobsOptions; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -56,9 +58,20 @@ public class BlobEventProcessorClientStoreTest { @Mock private BlobAsyncClient blobAsyncClient; + private AutoCloseable autoCloseable; + @BeforeEach - public void setup() { - MockitoAnnotations.initMocks(this); + public void beforeEach() { + this.autoCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + public void afterEach() throws Exception { + if (autoCloseable != null) { + autoCloseable.close(); + } + + Mockito.framework().clearInlineMock(this); } @Test diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelperTests.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelperTests.java new file mode 100644 index 0000000000000..8c4b80d5073b5 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelperTests.java @@ -0,0 +1,327 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.checkpointstore.blob; + +import com.azure.core.test.utils.metrics.TestCounter; +import com.azure.core.test.utils.metrics.TestGauge; +import com.azure.core.test.utils.metrics.TestMeasurement; +import com.azure.core.test.utils.metrics.TestMeter; +import com.azure.core.util.MetricsOptions; +import com.azure.core.util.metrics.Meter; +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.storage.blob.models.BlobItem; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class MetricsHelperTests { + private static final int MAX_ATTRIBUTES_SETS = 100; + + @Test + public void testUpdateDisabledMetrics() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Meter meter = mock(Meter.class); + when(meter.isEnabled()).thenReturn(false); + + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> { + assertEquals("azure-messaging-eventhubs-checkpointstore-blob", lib); + assertNotNull(ver); + return meter; + }); + + + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), testProvider); + helper.reportCheckpoint(checkpoint, "ns/eh/ch/0", true); + + verify(meter, atLeastOnce()).isEnabled(); + verify(meter, never()).createAttributes(anyMap()); + verify(meter, never()).createLongGauge(any(), any(), any()); + verify(meter, never()).createLongCounter(any(), any(), any()); + } + + @Test + public void testUpdateDisabledMetricsViaOptions() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Meter meter = mock(Meter.class); + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> meter); + MetricsHelper helper = new MetricsHelper(new MetricsOptions().setEnabled(false), testProvider); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true); + + verify(meter, never()).createAttributes(anyMap()); + verify(meter, never()).createLongGauge(any(), any(), any()); + verify(meter, never()).createLongCounter(any(), any(), any()); + } + + @Test + public void testUpdateEnabledMetrics() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + TestMeter meter = new TestMeter(); + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> { + assertEquals("azure-messaging-eventhubs-checkpointstore-blob", lib); + assertNotNull(ver); + return meter; + }); + + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), testProvider); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true); + + assertTrue(meter.getGauges().containsKey("messaging.eventhubs.checkpoint.sequence_number")); + TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); + assertEquals(1, seqNo.getSubscriptions().size()); + TestGauge.Subscription subs = seqNo.getSubscriptions().get(0); + + assertEquals(0, subs.getMeasurements().size()); + subs.measure(); + + TestMeasurement seqNoMeasurement = subs.getMeasurements().get(0); + assertEquals(2L, seqNoMeasurement.getValue()); + assertCommonAttributes(checkpoint, seqNoMeasurement.getAttributes()); + + assertTrue(meter.getCounters().containsKey("messaging.eventhubs.checkpoints")); + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); + assertEquals(1, checkpoints.getMeasurements().size()); + TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); + assertEquals(1, checkpointMeasurements.getValue()); + assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes()); + } + + + @Test + public void testUpdateEnabledMetricsFailure() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + TestMeter meter = new TestMeter(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", false); + + // sequence number is only reported for successful checkpoints + assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); + TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); + assertEquals(1, checkpointMeasurements.getValue()); + assertStatusAttributes(checkpoint, "error", checkpointMeasurements.getAttributes()); + } + + @Test + public void testUpdateEnabledMetricsNullSeqNo() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setOffset(100L); + + TestMeter meter = new TestMeter(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true); + + assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); + TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); + assertEquals(1, checkpointMeasurements.getValue()); + assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes()); + } + + @Test + public void testUpdateEnabledMetricsTooManyAttributes() { + TestMeter meter = new TestMeter(); + List checkpoints = IntStream.range(0, MAX_ATTRIBUTES_SETS + 10) + .mapToObj(n -> new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId(String.valueOf(n)) + .setSequenceNumber((long) n) + .setOffset(100L)) + .collect(Collectors.toList()); + + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + checkpoints.forEach(ch -> helper.reportCheckpoint(ch, "ns/eh/cg/" + ch.getPartitionId(), true)); + + List subscriptions = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions(); + assertEquals(MAX_ATTRIBUTES_SETS, subscriptions.size()); + subscriptions.forEach(subs -> subs.measure()); + + final int[] i = {0}; + subscriptions.forEach(subs -> { + assertEquals(1, subs.getMeasurements().size()); + TestMeasurement seqNoMeasurement = subs.getMeasurements().get(0); + assertEquals(i[0], seqNoMeasurement.getValue()); + assertCommonAttributes(checkpoints.get(i[0]), seqNoMeasurement.getAttributes()); + i[0]++; + }); + + TestCounter checkpointCounter = meter.getCounters().get("messaging.eventhubs.checkpoints"); + assertEquals(MAX_ATTRIBUTES_SETS, checkpointCounter.getMeasurements().size()); + + final int[] j = {0}; + checkpointCounter.getMeasurements().forEach(m -> { + assertEquals(1, m.getValue()); + assertStatusAttributes(checkpoints.get(j[0]), "ok", m.getAttributes()); + j[0]++; + }); + } + + @Test + public void testUpdateEnabledMetricsMultipleMeasurements() { + Checkpoint checkpoint1 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Checkpoint checkpoint2 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(42L) + .setOffset(100L); + + TestMeter meter = new TestMeter(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + helper.reportCheckpoint(checkpoint1, "ns/eh/cg/0", true); + helper.reportCheckpoint(checkpoint2, "ns/eh/cg/0", true); + + TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); + TestGauge.Subscription subs = seqNo.getSubscriptions().get(0); + subs.measure(); + + TestMeasurement seqNoMeasurement = subs.getMeasurements().get(0); + assertEquals(42L, seqNoMeasurement.getValue()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); + assertEquals(2, checkpoints.getMeasurements().size()); + + assertEquals(1, checkpoints.getMeasurements().get(0).getValue()); + assertEquals(1, checkpoints.getMeasurements().get(1).getValue()); + assertStatusAttributes(checkpoint2, "ok", checkpoints.getMeasurements().get(1).getAttributes()); + } + + @Test + public void testUpdateEnabledMetricsMultipleHubs() { + Checkpoint checkpoint1 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh1") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Checkpoint checkpoint2 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh2") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(42L) + .setOffset(100L); + + TestMeter meter = new TestMeter(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + + helper.reportCheckpoint(checkpoint1, "ns/eh1/cg/0", true); + helper.reportCheckpoint(checkpoint2, "ns/eh2/cg/0", true); + + TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); + assertEquals(2, seqNo.getSubscriptions().size()); + TestGauge.Subscription subs1 = seqNo.getSubscriptions().get(0); + TestGauge.Subscription subs2 = seqNo.getSubscriptions().get(1); + subs1.measure(); + subs2.measure(); + + TestMeasurement seqNoMeasurement1 = subs1.getMeasurements().get(0); + assertEquals(2L, seqNoMeasurement1.getValue()); + assertCommonAttributes(checkpoint1, seqNoMeasurement1.getAttributes()); + + TestMeasurement seqNoMeasurement2 = subs2.getMeasurements().get(0); + assertEquals(42L, seqNoMeasurement2.getValue()); + assertCommonAttributes(checkpoint2, seqNoMeasurement2.getAttributes()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); + assertEquals(2, checkpoints.getMeasurements().size()); + + assertEquals(1, checkpoints.getMeasurements().get(0).getValue()); + assertStatusAttributes(checkpoint1, "ok", checkpoints.getMeasurements().get(0).getAttributes()); + assertEquals(1, checkpoints.getMeasurements().get(1).getValue()); + assertStatusAttributes(checkpoint2, "ok", checkpoints.getMeasurements().get(1).getAttributes()); + } + + + private void assertStatusAttributes(Checkpoint checkpoint, String expectedStatus, Map attributes) { + assertEquals(5, attributes.size()); + assertEquals(checkpoint.getFullyQualifiedNamespace(), attributes.get("hostName")); + assertEquals(checkpoint.getEventHubName(), attributes.get("entityName")); + assertEquals(checkpoint.getPartitionId(), attributes.get("partitionId")); + assertEquals(checkpoint.getConsumerGroup(), attributes.get("consumerGroup")); + assertEquals(expectedStatus, attributes.get("status")); + } + + private void assertCommonAttributes(Checkpoint checkpoint, Map attributes) { + assertEquals(4, attributes.size()); + assertEquals(checkpoint.getFullyQualifiedNamespace(), attributes.get("hostName")); + assertEquals(checkpoint.getEventHubName(), attributes.get("entityName")); + assertEquals(checkpoint.getPartitionId(), attributes.get("partitionId")); + assertEquals(checkpoint.getConsumerGroup(), attributes.get("consumerGroup")); + } + + private BlobItem getCheckpointBlobItem(String offset, String sequenceNumber, String blobName) { + Map metadata = new HashMap<>(); + metadata.put("sequencenumber", sequenceNumber); + metadata.put("offset", offset); + return new BlobItem() + .setName(blobName) + .setMetadata(metadata); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/TestMeterProvider.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/TestMeterProvider.java new file mode 100644 index 0000000000000..0bf36956c5c77 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/TestMeterProvider.java @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.checkpointstore.blob; + +import com.azure.core.util.MetricsOptions; +import com.azure.core.util.metrics.Meter; +import com.azure.core.util.metrics.MeterProvider; + +public class TestMeterProvider implements MeterProvider { + + private final MeterFactory meterFactory; + public TestMeterProvider(MeterFactory meterFactory) { + this.meterFactory = meterFactory; + } + + @Override + public Meter createMeter(String libraryName, String libraryVersion, MetricsOptions options) { + return meterFactory.createMeter(libraryName, libraryVersion, options); + } + + @FunctionalInterface + interface MeterFactory { + Meter createMeter(String libraryName, String libraryVersion, MetricsOptions options); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java index aacd7e8836376..4e777deaa9969 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.TracerProvider; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; import com.azure.messaging.eventhubs.implementation.EventHubManagementNode; import reactor.core.publisher.Flux; @@ -33,10 +34,11 @@ class EventHubAsyncClient implements Closeable { private final Runnable onClientClose; private final TracerProvider tracerProvider; private final String identifier; + private final Meter meter; EventHubAsyncClient(EventHubConnectionProcessor connectionProcessor, TracerProvider tracerProvider, MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose, - String identifier) { + String identifier, Meter meter) { this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null."); this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null."); this.connectionProcessor = Objects.requireNonNull(connectionProcessor, @@ -46,6 +48,7 @@ class EventHubAsyncClient implements Closeable { this.isSharedConnection = isSharedConnection; this.identifier = identifier; + this.meter = meter; } /** @@ -108,7 +111,7 @@ Mono getPartitionProperties(String partitionId) { EventHubProducerAsyncClient createProducer() { return new EventHubProducerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(), connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer, scheduler, - isSharedConnection, onClientClose, identifier); + isSharedConnection, onClientClose, identifier, meter); } /** @@ -133,7 +136,7 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou return new EventHubConsumerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(), connectionProcessor, messageSerializer, consumerGroup, prefetchCount, isSharedConnection, - onClientClose, identifier); + onClientClose, identifier, meter); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 55cd67314eb21..cc9fca6d6c3c0 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -34,6 +34,8 @@ import com.azure.core.util.Configuration; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; +import com.azure.core.util.metrics.MeterProvider; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection; @@ -174,6 +176,9 @@ public class EventHubClientBuilder implements private static final String EVENTHUBS_PROPERTIES_FILE = "azure-messaging-eventhubs.properties"; private static final String NAME_KEY = "name"; private static final String VERSION_KEY = "version"; + + private static final String LIBRARY_NAME; + private static final String LIBRARY_VERSION; private static final String UNKNOWN = "UNKNOWN"; private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING"; @@ -199,6 +204,12 @@ public class EventHubClientBuilder implements private SslDomain.VerifyMode verifyMode; private URL customEndpointAddress; + static { + final Map properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE); + LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN); + LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN); + } + /** * Keeps track of the open clients that were created from this builder when there is a shared connection. */ @@ -794,13 +805,15 @@ EventHubAsyncClient buildAsyncClient() { prefetchCount = DEFAULT_PREFETCH_COUNT; } + final Meter meter = MeterProvider.getDefaultProvider().createMeter(LIBRARY_NAME, LIBRARY_VERSION, + clientOptions == null ? null : clientOptions.getMetricsOptions()); final MessageSerializer messageSerializer = new EventHubMessageSerializer(); final EventHubConnectionProcessor processor; if (isSharedConnection.get()) { synchronized (connectionLock) { if (eventHubConnectionProcessor == null) { - eventHubConnectionProcessor = buildConnectionProcessor(messageSerializer); + eventHubConnectionProcessor = buildConnectionProcessor(messageSerializer, meter); } } @@ -809,7 +822,7 @@ EventHubAsyncClient buildAsyncClient() { final int numberOfOpenClients = openClients.incrementAndGet(); LOGGER.info("# of open clients with shared connection: {}", numberOfOpenClients); } else { - processor = buildConnectionProcessor(messageSerializer); + processor = buildConnectionProcessor(messageSerializer, meter); } final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class)); @@ -823,8 +836,7 @@ EventHubAsyncClient buildAsyncClient() { } return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, scheduler, - isSharedConnection.get(), this::onClientClose, - identifier); + isSharedConnection.get(), this::onClientClose, identifier, meter); } /** @@ -884,7 +896,7 @@ void onClientClose() { } } - private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer) { + private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer, Meter meter) { final ConnectionOptions connectionOptions = getConnectionOptions(); final Flux connectionFlux = Flux.create(sink -> { sink.onRequest(request -> { @@ -906,7 +918,7 @@ private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer m connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getAuthorizationScope()); final ReactorProvider provider = new ReactorProvider(); - final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider); + final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider, meter); final EventHubAmqpConnection connection = new EventHubReactorAmqpConnection(connectionId, connectionOptions, getEventHubName(), provider, handlerProvider, tokenManagerProvider, @@ -959,18 +971,15 @@ private ConnectionOptions getConnectionOptions() { : SslDomain.VerifyMode.VERIFY_PEER_NAME; final ClientOptions options = clientOptions != null ? clientOptions : new ClientOptions(); - final Map properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE); - final String product = properties.getOrDefault(NAME_KEY, UNKNOWN); - final String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN); if (customEndpointAddress == null) { return new ConnectionOptions(getAndValidateFullyQualifiedNamespace(), credentials, authorizationType, ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE, transport, retryOptions, proxyOptions, scheduler, - options, verificationMode, product, clientVersion); + options, verificationMode, LIBRARY_NAME, LIBRARY_VERSION); } else { return new ConnectionOptions(getAndValidateFullyQualifiedNamespace(), credentials, authorizationType, ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE, transport, retryOptions, proxyOptions, scheduler, - options, verificationMode, product, clientVersion, customEndpointAddress.getHost(), + options, verificationMode, LIBRARY_NAME, LIBRARY_VERSION, customEndpointAddress.getHost(), customEndpointAddress.getPort()); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java index e4732052970db..ed9453b682258 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java @@ -13,9 +13,11 @@ import com.azure.core.annotation.ServiceClient; import com.azure.core.annotation.ServiceMethod; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; import com.azure.messaging.eventhubs.implementation.EventHubManagementNode; +import com.azure.messaging.eventhubs.implementation.EventHubsMetricsProvider; import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.PartitionEvent; import com.azure.messaging.eventhubs.models.ReceiveOptions; @@ -155,6 +157,7 @@ public class EventHubConsumerAsyncClient implements Closeable { private final boolean isSharedConnection; private final Runnable onClientClosed; private final String identifier; + private final EventHubsMetricsProvider metricsProvider; /** * Keeps track of the open partition consumers keyed by linkName. The link name is generated as: {@code * "partitionId_GUID"}. For receiving from all partitions, links are prefixed with {@code "all-GUID-partitionId"}. @@ -164,7 +167,7 @@ public class EventHubConsumerAsyncClient implements Closeable { EventHubConsumerAsyncClient(String fullyQualifiedNamespace, String eventHubName, EventHubConnectionProcessor connectionProcessor, MessageSerializer messageSerializer, String consumerGroup, - int prefetchCount, boolean isSharedConnection, Runnable onClientClosed, String identifier) { + int prefetchCount, boolean isSharedConnection, Runnable onClientClosed, String identifier, Meter meter) { this.fullyQualifiedNamespace = fullyQualifiedNamespace; this.eventHubName = eventHubName; this.connectionProcessor = connectionProcessor; @@ -174,6 +177,7 @@ public class EventHubConsumerAsyncClient implements Closeable { this.isSharedConnection = isSharedConnection; this.onClientClosed = onClientClosed; this.identifier = identifier; + this.metricsProvider = new EventHubsMetricsProvider(meter, fullyQualifiedNamespace, eventHubName, consumerGroup); } /** @@ -417,6 +421,7 @@ private Flux createConsumer(String linkName, String partitionId, .computeIfAbsent(linkName, name -> createPartitionConsumer(name, partitionId, startingPosition, receiveOptions)) .receive() + .doOnNext(event -> metricsProvider.reportReceive(event)) .doFinally(signal -> removeLink(linkName, partitionId, signal)); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index ab1a09217b9ed..096138d537f45 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -18,10 +18,12 @@ import com.azure.core.util.Context; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.core.util.tracing.ProcessKind; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; import com.azure.messaging.eventhubs.implementation.EventHubManagementNode; +import com.azure.messaging.eventhubs.implementation.EventHubsMetricsProvider; import com.azure.messaging.eventhubs.models.CreateBatchOptions; import com.azure.messaging.eventhubs.models.SendOptions; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; @@ -198,6 +200,8 @@ public class EventHubProducerAsyncClient implements Closeable { private final Runnable onClientClose; private final String identifier; + private final EventHubsMetricsProvider metricsProvider; + /** * Creates a new instance of this {@link EventHubProducerAsyncClient} that can send messages to a single partition * when {@link CreateBatchOptions#getPartitionId()} is not null or an empty string. Otherwise, allows the service to @@ -206,7 +210,7 @@ public class EventHubProducerAsyncClient implements Closeable { EventHubProducerAsyncClient(String fullyQualifiedNamespace, String eventHubName, EventHubConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider, MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose, - String identifier) { + String identifier, Meter meter) { this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null."); @@ -221,6 +225,7 @@ public class EventHubProducerAsyncClient implements Closeable { this.scheduler = scheduler; this.isSharedConnection = isSharedConnection; this.identifier = identifier; + this.metricsProvider = new EventHubsMetricsProvider(meter, fullyQualifiedNamespace, eventHubName, null); } /** @@ -583,8 +588,10 @@ public Mono send(EventDataBatch batch) { String.format("partitionId[%s]: Sending messages timed out.", batch.getPartitionId())) .publishOn(scheduler) .doOnEach(signal -> { + Context context = isTracingEnabled ? parentContext.get() : Context.NONE; + metricsProvider.reportBatchSend(batch, batch.getPartitionId(), signal.getThrowable(), context); if (isTracingEnabled) { - tracerProvider.endSpan(parentContext.get(), signal); + tracerProvider.endSpan(context, signal); } }); } @@ -632,7 +639,7 @@ private String getEntityPath(String partitionId) { private Mono getSendLink(String partitionId) { final String entityPath = getEntityPath(partitionId); - final String linkName = getEntityPath(partitionId); + final String linkName = entityPath; return connectionProcessor .flatMap(connection -> connection.createSendLink(linkName, entityPath, retryOptions, identifier)); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java new file mode 100644 index 0000000000000..54638e5aaecec --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java @@ -0,0 +1,110 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import com.azure.core.util.Context; +import com.azure.core.util.TelemetryAttributes; +import com.azure.core.util.metrics.DoubleHistogram; +import com.azure.core.util.metrics.LongCounter; +import com.azure.core.util.metrics.Meter; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.models.PartitionEvent; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_NAME_KEY; +import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONSUMER_GROUP_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; + +public class EventHubsMetricsProvider { + private static final String GENERIC_STATUS_KEY = "status"; + private final Meter meter; + private final boolean isEnabled; + + private AttributeCache sendAttributeCacheSuccess; + private AttributeCache sendAttributeCacheFailure; + private AttributeCache receiveAttributeCache; + private LongCounter sentEventsCounter; + private DoubleHistogram consumerLag; + + public EventHubsMetricsProvider(Meter meter, String namespace, String entityName, String consumerGroup) { + this.meter = meter; + this.isEnabled = meter != null && meter.isEnabled(); + if (this.isEnabled) { + Map commonAttributesMap = new HashMap<>(3); + commonAttributesMap.put(HOSTNAME_KEY, namespace); + commonAttributesMap.put(ENTITY_NAME_KEY, entityName); + if (consumerGroup != null) { + commonAttributesMap.put(CONSUMER_GROUP_KEY, consumerGroup); + } + + Map successMap = new HashMap<>(commonAttributesMap); + successMap.put(GENERIC_STATUS_KEY, "ok"); + this.sendAttributeCacheSuccess = new AttributeCache(PARTITION_ID_KEY, successMap); + + Map failureMap = new HashMap<>(commonAttributesMap); + failureMap.put(GENERIC_STATUS_KEY, "error"); + this.sendAttributeCacheFailure = new AttributeCache(PARTITION_ID_KEY, failureMap); + + this.receiveAttributeCache = new AttributeCache(PARTITION_ID_KEY, commonAttributesMap); + this.sentEventsCounter = meter.createLongCounter("messaging.eventhubs.events.sent", "Number of sent events", "events"); + this.consumerLag = meter.createDoubleHistogram("messaging.eventhubs.consumer.lag", "Difference between local time when event was received and the local time it was enqueued on broker.", "sec"); + } + } + + public void reportBatchSend(EventDataBatch batch, String partitionId, Throwable throwable, Context context) { + if (isEnabled && sentEventsCounter.isEnabled()) { + AttributeCache cache = throwable == null ? sendAttributeCacheSuccess : sendAttributeCacheFailure; + sentEventsCounter.add(batch.getCount(), cache.getOrCreate(partitionId), context); + } + } + + public void reportReceive(PartitionEvent event) { + if (isEnabled && consumerLag.isEnabled()) { + Instant enqueuedTime = event.getData().getEnqueuedTime(); + double diff = 0d; + if (enqueuedTime != null) { + diff = Instant.now().toEpochMilli() - enqueuedTime.toEpochMilli(); + if (diff < 0) { + // time skew on machines + diff = 0; + } + } + consumerLag.record(diff / 1000d, + receiveAttributeCache.getOrCreate(event.getPartitionContext().getPartitionId()), + Context.NONE); + } + } + + class AttributeCache { + private final Map attr = new ConcurrentHashMap<>(); + private final TelemetryAttributes commonAttr; + private final Map commonMap; + private final String dimensionName; + + AttributeCache(String dimensionName, Map common) { + this.dimensionName = dimensionName; + this.commonMap = common; + this.commonAttr = meter.createAttributes(commonMap); + } + + public TelemetryAttributes getOrCreate(String value) { + if (value == null) { + return commonAttr; + } + + return attr.computeIfAbsent(value, this::create); + } + + private TelemetryAttributes create(String value) { + Map attributes = new HashMap<>(commonMap); + attributes.put(dimensionName, value); + return meter.createAttributes(attributes); + } + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java index ec4748fbf0dce..2fcf3cdbb98ca 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java @@ -12,9 +12,13 @@ import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.credential.TokenCredential; +import com.azure.core.test.utils.metrics.TestHistogram; +import com.azure.core.test.utils.metrics.TestMeasurement; +import com.azure.core.test.utils.metrics.TestMeter; import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; @@ -23,6 +27,7 @@ import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties; import com.azure.messaging.eventhubs.models.PartitionEvent; import com.azure.messaging.eventhubs.models.ReceiveOptions; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.AfterAll; @@ -48,16 +53,21 @@ import java.time.Duration; import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; import static com.azure.messaging.eventhubs.TestUtils.getMessage; import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -86,6 +96,7 @@ class EventHubConsumerAsyncClientTest { private static final String CONSUMER_GROUP = "consumer-group-test"; private static final String PARTITION_ID = "a-partition-id"; private static final String CLIENT_IDENTIFIER = "my-client-identifier"; + private static final Meter DEFAULT_METER = null; private static final ClientLogger LOGGER = new ClientLogger(EventHubConsumerAsyncClientTest.class); private final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setMaxRetries(2); @@ -148,7 +159,7 @@ AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), ProxyOptions.SYSTEM_ "event-hub-name", connectionOptions.getRetry())); consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, - CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); } @AfterEach @@ -170,7 +181,7 @@ void teardown() throws Exception { @Test void lastEnqueuedEventInformationIsNull() { final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int numberOfEvents = 10; when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); final int numberToReceive = 3; @@ -193,7 +204,7 @@ void lastEnqueuedEventInformationIsNull() { void lastEnqueuedEventInformationCreated() { // Arrange final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int numberOfEvents = 10; final ReceiveOptions receiveOptions = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true); when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); @@ -204,7 +215,7 @@ void lastEnqueuedEventInformationCreated() { .then(() -> sendMessages(messageProcessor, numberOfEvents, PARTITION_ID)) .assertNext(event -> { LastEnqueuedEventProperties properties = event.getLastEnqueuedEventProperties(); - Assertions.assertNotNull(properties); + assertNotNull(properties); Assertions.assertNull(properties.getOffset()); Assertions.assertNull(properties.getSequenceNumber()); Assertions.assertNull(properties.getRetrievalTime()); @@ -246,7 +257,7 @@ void receivesNumberOfEventsAllowsBlock() throws InterruptedException { // Scheduling on elastic to simulate a user passed in scheduler (this is the default in EventHubClientBuilder). final EventHubConsumerAsyncClient myConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final Flux eventsFlux = myConsumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) .take(numberOfEvents); @@ -306,7 +317,7 @@ void returnsNewListener() { any(ReceiveOptions.class), anyString())).thenReturn(Mono.just(link2), Mono.just(link3)); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act & Assert StepVerifier.create(asyncClient.receiveFromPartition(PARTITION_ID, EventPosition.earliest()).take(numberOfEvents)) @@ -529,7 +540,7 @@ void receivesMultiplePartitions() { .thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions))); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); TestPublisher processor2 = TestPublisher.createCold(); AmqpReceiveLink link2 = mock(AmqpReceiveLink.class); @@ -604,7 +615,7 @@ void receivesMultiplePartitionsWhenOneCloses() { .thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions))); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); TestPublisher processor2 = TestPublisher.create(); AmqpReceiveLink link2 = mock(AmqpReceiveLink.class); @@ -664,7 +675,7 @@ void doesNotCloseSharedConnection() { EventHubConnectionProcessor eventHubConnection = Flux.create(sink -> sink.next(connection1)) .subscribeWith(new EventHubConnectionProcessor(HOSTNAME, EVENT_HUB_NAME, retryOptions)); EventHubConsumerAsyncClient sharedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, true, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, true, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act sharedConsumer.close(); @@ -684,7 +695,7 @@ void closesDedicatedConnection() { // Arrange EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubConsumerAsyncClient dedicatedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - hubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + hubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act dedicatedConsumer.close(); @@ -695,6 +706,123 @@ void closesDedicatedConnection() { verifyNoInteractions(onClientClosed); } + @Test + void receiveReportsMetrics() { + // Arrange + final int numberOfEvents = 2; + when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); + + TestMeter meter = new TestMeter(); + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(numberOfEvents); + + Instant enqueuedTime = Instant.now().minusSeconds(1000); + sendMessages(messageProcessor, numberOfEvents, PARTITION_ID, enqueuedTime); + + // Act + StepVerifier.create(receive) + .thenConsumeWhile(e -> { + TestHistogram consumerLag = meter.getHistograms().get("messaging.eventhubs.consumer.lag"); + assertNotNull(consumerLag); + Instant afterReceived = Instant.now(); + + List> measurements = consumerLag.getMeasurements(); + TestMeasurement last = measurements.get(measurements.size() - 1); + assertEquals(Duration.between(enqueuedTime, afterReceived).toMillis() / 1000d, last.getValue(), 1); + assertAttributes(EVENT_HUB_NAME, e.getPartitionContext().getPartitionId(), last.getAttributes()); + return true; + }) + .verifyComplete(); + + assertEquals(numberOfEvents, meter.getHistograms().get("messaging.eventhubs.consumer.lag").getMeasurements().size()); + } + + @Test + void receiveReportsMetricsNegativeLag() { + // Arrange + when(amqpReceiveLink.getCredits()).thenReturn(1); + + TestMeter meter = new TestMeter(); + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(1); + + Instant enqueuedTime1 = Instant.now().plusSeconds(1000); + sendMessages(messageProcessor, 1, PARTITION_ID, enqueuedTime1); + + // Act + StepVerifier.create(receive) + .consumeNextWith(e -> { + TestHistogram consumerLag = meter.getHistograms().get("messaging.eventhubs.consumer.lag"); + assertNotNull(consumerLag); + Instant afterReceived = Instant.now(); + + List> measurements = consumerLag.getMeasurements(); + TestMeasurement last = measurements.get(measurements.size() - 1); + assertEquals(0, last.getValue()); + assertAttributes(EVENT_HUB_NAME, e.getPartitionContext().getPartitionId(), last.getAttributes()); + }) + .verifyComplete(); + + assertEquals(1, meter.getHistograms().get("messaging.eventhubs.consumer.lag").getMeasurements().size()); + } + + @Test + void receiveDoesNotReportDisabledMetrics() { + // Arrange + when(amqpReceiveLink.getCredits()).thenReturn(1); + + TestMeter meter = new TestMeter(false); + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(1); + + // Act + sendMessages(messageProcessor, 1, PARTITION_ID); + StepVerifier.create(receive) + .expectNextCount(1) + .verifyComplete(); + + assertFalse(meter.getHistograms().containsKey("messaging.eventhubs.consumer.lag")); + } + + @Test + void receiveNullMeterDoesNotThrow() { + // Arrange + when(amqpReceiveLink.getCredits()).thenReturn(1); + + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, null); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(1); + + // Act + sendMessages(messageProcessor, 1, PARTITION_ID); + StepVerifier.create(receive) + .expectNextCount(1) + .verifyComplete(); + } + + private void assertAttributes(String entityName, String entityPath, Map attributes) { + assertEquals(4, attributes.size()); + assertEquals(HOSTNAME, attributes.get("hostName")); + assertEquals(entityName, attributes.get("entityName")); + assertEquals(CONSUMER_GROUP, attributes.get("consumerGroup")); + assertEquals(entityPath, attributes.get("partitionId")); + } + private void assertPartition(String partitionId, PartitionEvent event) { System.out.println("Event received: " + event.getPartitionContext().getPartitionId()); final Object value = event.getData().getProperties().get(PARTITION_ID_HEADER); @@ -713,4 +841,14 @@ private void sendMessages(TestPublisher testPublisher, int numberOfEven testPublisher.next(getMessage(PAYLOAD_BYTES, messageTrackingUUID, map)); } } + + private void sendMessages(TestPublisher testPublisher, int numberOfEvents, String partitionId, Instant enqueueTime) { + Map map = Collections.singletonMap(PARTITION_ID_HEADER, partitionId); + + for (int i = 0; i < numberOfEvents; i++) { + Message message = getMessage(PAYLOAD_BYTES, messageTrackingUUID, map); + message.getMessageAnnotations().getValue().put(Symbol.valueOf(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()), enqueueTime); + testPublisher.next(message); + } + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java index 64881caece816..b1da9c2be2059 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java @@ -15,6 +15,7 @@ import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.IterableStream; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; @@ -74,6 +75,7 @@ public class EventHubConsumerClientTest { private static final String CLIENT_IDENTIFIER = "my-client-identifier"; private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(4); private static final Duration TIMEOUT = Duration.ofSeconds(30); + private static final Meter DEFAULT_METER = null; private final String messageTrackingUUID = UUID.randomUUID().toString(); private final TestPublisher messageProcessor = TestPublisher.createCold(); @@ -138,7 +140,7 @@ AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), ProxyOptions.SYSTEM_ when(connection.closeAsync()).thenReturn(Mono.empty()); asyncConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); consumer = new EventHubConsumerClient(asyncConsumer, Duration.ofSeconds(10)); } @@ -164,7 +166,7 @@ public void lastEnqueuedEventInformationIsNull() { // Arrange final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient( HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP, - PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5)); final int numberOfEvents = 10; sendMessages(messageProcessor, numberOfEvents, PARTITION_ID); @@ -194,7 +196,7 @@ public void lastEnqueuedEventInformationCreated() { final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true); final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient( HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5)); final int numberOfEvents = 10; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java index bb83e04b454ad..72137f1a25201 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java @@ -17,10 +17,14 @@ import com.azure.core.amqp.implementation.TracerProvider; import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.credential.TokenCredential; +import com.azure.core.test.utils.metrics.TestCounter; +import com.azure.core.test.utils.metrics.TestMeasurement; +import com.azure.core.test.utils.metrics.TestMeter; import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.core.util.tracing.ProcessKind; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.ClientConstants; @@ -55,6 +59,7 @@ import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -71,6 +76,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -94,7 +100,7 @@ class EventHubProducerAsyncClientTest { private static final String ENTITY_PATH = HOSTNAME + Configuration.getGlobalConfiguration() .get("AZURE_EVENTHUBS_ENDPOINT_SUFFIX", ".servicebus.windows.net"); private static final ClientLogger LOGGER = new ClientLogger(EventHubProducerAsyncClient.class); - + private static final Meter DEFAULT_METER = null; @Mock private AmqpSendLink sendLink; @Mock @@ -161,7 +167,7 @@ void setup(TestInfo testInfo) { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), "event-hub-path", connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); when(sendLink.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); when(sendLink2.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); @@ -255,7 +261,7 @@ void sendSingleMessageWithBlock() throws InterruptedException { // In our actual client builder, we allow this. final EventHubProducerAsyncClient flexibleProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // EC is the prefix they use when creating a link that sends to the service round-robin. when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER))) @@ -334,7 +340,7 @@ void sendStartSpanSingleMessage() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); @@ -402,7 +408,7 @@ void sendMessageRetrySpanTest() { TracerProvider tracerProvider = new TracerProvider(tracers); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final String failureKey = "fail"; final EventData testData = new EventData("test") @@ -539,7 +545,7 @@ void startMessageSpansOnCreateBatch() { TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); @@ -822,6 +828,148 @@ void sendsAnEventDataBatch() { verify(link, times(2)).getLinkSize(); } + @Test + void sendsAnEventDataBatchWithMetrics() { + String eventHub1 = EVENT_HUB_NAME + "1"; + String eventHub2 = EVENT_HUB_NAME + "2"; + + when(connection.createSendLink(eq(eventHub1), eq(eventHub1), any(), any())).thenReturn(Mono.just(sendLink)); + when(connection.createSendLink(eq(eventHub2), eq(eventHub2), any(), any())).thenReturn(Mono.just(sendLink2)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(eventHub1); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + when(sendLink2.send(anyList())).thenReturn(Mono.empty()); + when(sendLink2.getHostname()).thenReturn(HOSTNAME); + when(sendLink2.getEntityPath()).thenReturn(eventHub1); + when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty()); + + TestMeter meter = new TestMeter(); + EventHubProducerAsyncClient producer1 = new EventHubProducerAsyncClient(HOSTNAME, eventHub1, + connectionProcessor, retryOptions, + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + EventHubProducerAsyncClient producer2 = new EventHubProducerAsyncClient(HOSTNAME, eventHub2, + connectionProcessor, retryOptions, + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + StepVerifier.create(producer1.createBatch() + .flatMap(batch -> { + batch.tryAdd(new EventData("1")); + return producer1.send(batch); + }) + .then(producer2.createBatch(). + flatMap(batch -> { + batch.tryAdd(new EventData("2")); + batch.tryAdd(new EventData("3")); + return producer2.send(batch); + }))) + .verifyComplete(); + + TestCounter eventCounter = meter.getCounters().get("messaging.eventhubs.events.sent"); + assertNotNull(eventCounter); + + List> measurements = eventCounter.getMeasurements(); + assertEquals(2, measurements.size()); + + assertEquals(1, measurements.get(0).getValue()); + assertEquals(2, measurements.get(1).getValue()); + assertAttributes(eventHub1, null, "ok", measurements.get(0).getAttributes()); + assertAttributes(eventHub2, null, "ok", measurements.get(1).getAttributes()); + } + + @Test + void sendsAnEventDataBatchWithMetricsFailure() { + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(any(Message.class))).thenReturn(Mono.error(new RuntimeException("foo"))); + + TestMeter meter = new TestMeter(); + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + StepVerifier.create(producer.send(new EventData("1"))) + .expectErrorMessage("foo") + .verify(); + + TestCounter eventCounter = meter.getCounters().get("messaging.eventhubs.events.sent"); + assertNotNull(eventCounter); + + List> measurements = eventCounter.getMeasurements(); + assertEquals(1, measurements.size()); + + assertEquals(1, measurements.get(0).getValue()); + assertAttributes(EVENT_HUB_NAME, null, "error", measurements.get(0).getAttributes()); + } + + @Test + void sendsAnEventDataBatchWithMetricsPartitionId() { + String partitionId = "1"; + String entityPath = EVENT_HUB_NAME + "/Partitions/" + partitionId; + when(connection.createSendLink(eq(entityPath), eq(entityPath), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(entityPath); + when(sendLink.getLinkName()).thenReturn(entityPath); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + TestMeter meter = new TestMeter(); + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + SendOptions options = new SendOptions().setPartitionId(partitionId); + StepVerifier.create(producer.send(new EventData("1"), options)) + .verifyComplete(); + + TestCounter eventCounter = meter.getCounters().get("messaging.eventhubs.events.sent"); + assertNotNull(eventCounter); + + List> measurements = eventCounter.getMeasurements(); + assertEquals(1, measurements.size()); + + assertEquals(1, measurements.get(0).getValue()); + assertAttributes(EVENT_HUB_NAME, partitionId, "ok", measurements.get(0).getAttributes()); + } + + @Test + void sendsAnEventDataBatchWithDisabledMetrics() { + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.getLinkName()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + TestMeter meter = new TestMeter(false); + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + StepVerifier.create(producer.send(new EventData("1"))) + .verifyComplete(); + + assertFalse(meter.getCounters().containsKey("messaging.eventhubs.events.sent")); + } + + @Test + void sendsAnEventDataBatchWithNullMeterDoesNotThrow() { + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.getLinkName()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, null); + + StepVerifier.create(producer.send(new EventData("1"))) + .verifyComplete(); + } + /** * Verify we can send messages to multiple partitionIds with same sender. */ @@ -888,7 +1036,7 @@ void doesNotCloseSharedConnection() { EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient sharedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - true, onClientClosed, CLIENT_IDENTIFIER); + true, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act sharedProducer.close(); @@ -907,7 +1055,7 @@ void closesDedicatedConnection() { EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act dedicatedProducer.close(); @@ -926,7 +1074,7 @@ void closesDedicatedConnectionOnlyOnce() { EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act dedicatedProducer.close(); @@ -962,7 +1110,7 @@ void reopensOnFailure() { new EventHubConnectionProcessor(EVENT_HUB_NAME, connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -1037,7 +1185,7 @@ void closesOnNonTransientFailure() { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), EVENT_HUB_NAME, connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -1113,7 +1261,7 @@ void resendMessageOnTransientLinkFailure() { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), EVENT_HUB_NAME, connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -1167,6 +1315,14 @@ void resendMessageOnTransientLinkFailure() { verifyNoInteractions(onClientClosed); } + private void assertAttributes(String entityName, String entityPath, String status, Map attributes) { + assertEquals(entityPath == null ? 3 : 4, attributes.size()); + assertEquals(HOSTNAME, attributes.get("hostName")); + assertEquals(entityName, attributes.get("entityName")); + assertEquals(entityPath, attributes.get("partitionId")); + assertEquals(status, attributes.get("status")); + } + private static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec " + "vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. " + "Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet " diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java index fd3582cc80b9a..d751e4b57fd8d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java @@ -18,6 +18,7 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; import com.azure.core.util.Context; +import com.azure.core.util.metrics.Meter; import com.azure.core.util.tracing.ProcessKind; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.ClientConstants; @@ -77,6 +78,7 @@ public class EventHubProducerClientTest { private static final String HOSTNAME = "my-host-name"; private static final String EVENT_HUB_NAME = "my-event-hub-name"; private static final String CLIENT_IDENTIFIER = "my-client-identifier"; + private static final Meter DEFAULT_METER = null; private final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30)); private final MessageSerializer messageSerializer = new EventHubMessageSerializer(); @@ -117,7 +119,7 @@ public void setup() { .subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), "event-hub-path", connectionOptions.getRetry())); asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); when(connection.closeAsync()).thenReturn(Mono.empty()); @@ -171,7 +173,7 @@ public void sendStartSpanSingleMessage() { final TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)); @@ -246,7 +248,7 @@ public void sendMessageRetrySpanTest() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)) .addContext(SPAN_CONTEXT_KEY, "span-context"); @@ -300,7 +302,7 @@ public void sendEventsExceedsBatchSize() { TracerProvider tracerProvider = new TracerProvider(Collections.emptyList()); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); //Act & Assert @@ -403,7 +405,7 @@ public void startsMessageSpanOnEventBatch() { final TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final AmqpSendLink link = mock(AmqpSendLink.class); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java index 9da77be80884d..9ddf74d072824 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java @@ -41,7 +41,7 @@ import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; -import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY; +import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -147,7 +147,7 @@ public void testWithSimplePartitionProcessor() throws Exception { return passed.addData(SPAN_CONTEXT_KEY, "value1") .addData("scope", (AutoCloseable) () -> { }) - .addData(PARENT_SPAN_KEY, "value2"); + .addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -239,7 +239,7 @@ public void testProcessSpans() throws Exception { assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { return; - }).addData(PARENT_SPAN_KEY, "value2"); + }).addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -303,7 +303,7 @@ public void testProcessSpansWithoutDiagnosticId() throws Exception { assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { return; - }).addData(PARENT_SPAN_KEY, "value2"); + }).addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -628,7 +628,7 @@ public void testSingleEventReceiveHeartBeat() throws InterruptedException { Context passed = invocation.getArgument(1, Context.class); return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { return; - }).addData(PARENT_SPAN_KEY, "value2"); + }).addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java index ef750a325041f..b363ef59af407 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java @@ -20,6 +20,7 @@ import com.azure.core.amqp.implementation.TokenManagerProvider; import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.credential.TokenCredential; +import com.azure.core.test.utils.metrics.TestMeter; import com.azure.core.util.ClientOptions; import com.azure.core.util.CoreUtils; import com.azure.core.util.Header; @@ -89,7 +90,8 @@ protected void beforeTest() { retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofMinutes(1)); reactorProvider = new ReactorProvider(); - handlerProvider = new ReactorHandlerProvider(reactorProvider); + + handlerProvider = new ReactorHandlerProvider(reactorProvider, new TestMeter(false)); clientOptions.setHeaders( Arrays.asList(new Header("name", product), new Header("version", clientVersion))); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java index c4e35363a7ec5..f00225382b96d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.AmqpMetricsProvider; import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.ReactorDispatcher; @@ -124,7 +125,7 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), proxy, scheduler, clientOptions, final SslPeerDetails peerDetails = Proton.sslPeerDetails(HOSTNAME, ConnectionHandler.AMQPS_PORT); connectionHandler = new ConnectionHandler(CONNECTION_ID, connectionOptions, - peerDetails); + peerDetails, AmqpMetricsProvider.noop()); when(reactor.selectable()).thenReturn(selectable); when(reactor.connectionToHost(connectionHandler.getHostname(), connectionHandler.getProtocolPort(), @@ -140,7 +141,7 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), proxy, scheduler, clientOptions, .thenReturn(reactor); final SessionHandler sessionHandler = new SessionHandler(CONNECTION_ID, HOSTNAME, "EVENT_HUB", - reactorDispatcher, Duration.ofSeconds(20)); + reactorDispatcher, Duration.ofSeconds(20), AmqpMetricsProvider.noop()); when(handlerProvider.createConnectionHandler(CONNECTION_ID, connectionOptions)) .thenReturn(connectionHandler); @@ -178,10 +179,10 @@ public void getsManagementChannel() { when(receiver.attachments()).thenReturn(linkRecord); when(handlerProvider.createReceiveLinkHandler(eq(CONNECTION_ID), eq(HOSTNAME), anyString(), anyString())) - .thenReturn(new ReceiveLinkHandler(CONNECTION_ID, HOSTNAME, "receiver-name", "test-entity-path")); + .thenReturn(new ReceiveLinkHandler(CONNECTION_ID, HOSTNAME, "receiver-name", "test-entity-path", AmqpMetricsProvider.noop())); when(handlerProvider.createSendLinkHandler(eq(CONNECTION_ID), eq(HOSTNAME), anyString(), anyString())) - .thenReturn(new SendLinkHandler(CONNECTION_ID, HOSTNAME, "sender-name", "test-entity-path")); + .thenReturn(new SendLinkHandler(CONNECTION_ID, HOSTNAME, "sender-name", "test-entity-path", AmqpMetricsProvider.noop())); final EventHubReactorAmqpConnection connection = new EventHubReactorAmqpConnection(CONNECTION_ID, connectionOptions, "event-hub-name", reactorProvider, handlerProvider, tokenManagerProvider,