From 595e10e859b20794d9dd7e8bb8f768f8bf8cf278 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Fri, 16 Sep 2022 17:13:11 -0700 Subject: [PATCH 1/5] EventHubs metrics: consumer lag and sent events counter --- .../pom.xml | 6 + .../blob/BlobCheckpointStore.java | 28 +- .../checkpointstore/blob/MetricsHelper.java | 183 ++++++++ ...-eventhubs-checkpointstore-blob.properties | 2 + .../blob/BlobCheckpointStoreMetricsTests.java | 401 ++++++++++++++++++ .../BlobEventProcessorClientStoreTest.java | 2 +- .../blob/TestMeterProvider.java | 26 ++ .../eventhubs/EventHubAsyncClient.java | 9 +- .../eventhubs/EventHubClientBuilder.java | 30 +- .../EventHubConsumerAsyncClient.java | 7 +- .../EventHubProducerAsyncClient.java | 26 +- .../EventHubsMetricsProvider.java | 108 +++++ .../EventHubConsumerAsyncClientTest.java | 158 ++++++- .../eventhubs/EventHubConsumerClientTest.java | 8 +- .../EventHubProducerAsyncClientTest.java | 180 +++++++- .../eventhubs/EventHubProducerClientTest.java | 12 +- .../eventhubs/EventProcessorClientTest.java | 10 +- .../implementation/CBSChannelTest.java | 4 +- .../EventHubReactorConnectionTest.java | 9 +- 19 files changed, 1140 insertions(+), 69 deletions(-) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java create mode 100644 sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/azure-messaging-eventhubs-checkpointstore-blob.properties create mode 100644 sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java create mode 100644 sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/TestMeterProvider.java create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml index 0bd716a1421e7..52a1424c186c9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/pom.xml @@ -58,6 +58,12 @@ + + com.azure + azure-core-test + 1.12.0 + test + com.azure azure-identity diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java index 88d933c67e949..79429a78e7865 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java @@ -4,6 +4,7 @@ package com.azure.messaging.eventhubs.checkpointstore.blob; import com.azure.core.http.rest.Response; +import com.azure.core.util.ClientOptions; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.CheckpointStore; @@ -12,22 +13,22 @@ import com.azure.messaging.eventhubs.models.PartitionOwnership; import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.BlobContainerAsyncClient; -import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobItem; -import com.azure.storage.blob.models.BlobListDetails; import com.azure.storage.blob.models.BlobItemProperties; +import com.azure.storage.blob.models.BlobListDetails; +import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.ListBlobsOptions; -import java.util.List; -import java.util.Objects; -import java.util.function.Function; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; @@ -66,6 +67,7 @@ public class BlobCheckpointStore implements CheckpointStore { private static final ClientLogger LOGGER = new ClientLogger(BlobCheckpointStore.class); private final BlobContainerAsyncClient blobContainerAsyncClient; + private final MetricsHelper metricsHelper; private final Map blobClients = new ConcurrentHashMap<>(); /** @@ -75,7 +77,20 @@ public class BlobCheckpointStore implements CheckpointStore { * blobs in the storage container. */ public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient) { + this(blobContainerAsyncClient, null); + } + + + /** + * Creates an instance of BlobCheckpointStore. + * + * @param blobContainerAsyncClient The {@link BlobContainerAsyncClient} this instance will use to read and update + * @param options The {@link ClientOptions} to configure this instance. + * blobs in the storage container. + */ + public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient, ClientOptions options) { this.blobContainerAsyncClient = blobContainerAsyncClient; + this.metricsHelper = new MetricsHelper(options == null ? null : options.getMetricsOptions()); } /** @@ -256,6 +271,9 @@ public Mono updateCheckpoint(Checkpoint checkpoint) { return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null, null).then(); } + }) + .doOnEach(signal -> { + metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError()); }); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java new file mode 100644 index 0000000000000..7f69e7631e3f0 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java @@ -0,0 +1,183 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.checkpointstore.blob; + +import com.azure.core.util.Context; +import com.azure.core.util.CoreUtils; +import com.azure.core.util.MetricsOptions; +import com.azure.core.util.TelemetryAttributes; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.LongCounter; +import com.azure.core.util.metrics.LongGauge; +import com.azure.core.util.metrics.Meter; +import com.azure.core.util.metrics.MeterProvider; +import com.azure.messaging.eventhubs.models.Checkpoint; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +class MetricsHelper { + private static final ClientLogger LOGGER = new ClientLogger(MetricsHelper.class); + public static final String ENTITY_NAME_KEY = "entityName"; + public static final String HOSTNAME_KEY = "hostName"; + public static final String PARTITION_ID_KEY = "partitionId"; + public static final String CONSUMER_GROUP_KEY = "consumerGroup"; + + // since checkpoint store is stateless it might be used for endless number of eventhubs. + // we'll have as many subscriptions as there are combinations of fqdn, eventhub name, partitionId and consumer group. + // In the unlikely case it's shared across a lot of EH client instances, metrics would be too costly + // and unhelpful. So, let's just set a hard limit on number of subscriptions. + private static final int MAX_ATTRIBUTES_SETS = 100; + + private static final String PROPERTIES_FILE = "azure-messaging-eventhubs-checkpointstore-blob.properties"; + private static final String NAME_KEY = "name"; + private static final String VERSION_KEY = "version"; + private static final String LIBRARY_NAME; + private static final String LIBRARY_VERSION; + private static MeterProvider meterProvider = MeterProvider.getDefaultProvider(); + + private static final String UNKNOWN = "UNKNOWN"; + + static { + final Map properties = CoreUtils.getProperties(PROPERTIES_FILE); + LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN); + LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN); + } + + private final ConcurrentHashMap common = new ConcurrentHashMap<>(); + private final ConcurrentHashMap checkpointFailure = new ConcurrentHashMap<>(); + private final ConcurrentHashMap checkpointSuccess = new ConcurrentHashMap<>(); + private final ConcurrentHashMap seqNoSubscriptions = new ConcurrentHashMap<>(); + + private volatile boolean maxCapacityReached = false; + + private final Meter meter; + private final LongGauge lastSequenceNumber; + private final LongCounter checkpointCounter; + private final boolean isEnabled; + MetricsHelper(MetricsOptions metricsOptions) { + if (metricsOptions != null && !metricsOptions.isEnabled()) { + this.meter = null; + } else { + this.meter = meterProvider.createMeter(LIBRARY_NAME, LIBRARY_VERSION, metricsOptions); + } + this.isEnabled = meter != null && meter.isEnabled(); + if (isEnabled) { + this.lastSequenceNumber = this.meter.createLongGauge("messaging.eventhubs.checkpoint.sequence_number", "Last successfully checkpointed sequence number.", "seqNo"); + this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoint", "Number of checkpoints.", null); + } else { + this.lastSequenceNumber = null; + this.checkpointCounter = null; + } + } + + /** + * Sets test meter provider. + */ + static synchronized void setMeterProvider(MeterProvider testMeterProvider) { + // TODO (lmolkova) add TestMeterProvider to azure-core-test + MetricsHelper.meterProvider = testMeterProvider; + } + + void reportCheckpoint(Checkpoint checkpoint, String attributesId, boolean success) { + if (!isEnabled || !(lastSequenceNumber.isEnabled() && checkpointCounter.isEnabled())) { + return; + } + + if (!maxCapacityReached && (seqNoSubscriptions.size() >= MAX_ATTRIBUTES_SETS || common.size() >= MAX_ATTRIBUTES_SETS)) { + LOGGER.error("Too many attribute combinations are reported for checkpoint metrics, ignoring any new dimensions."); + maxCapacityReached = true; + } + + if (lastSequenceNumber.isEnabled() && success) { + updateCurrentValue(attributesId, checkpoint); + } + + if (checkpointCounter.isEnabled()) { + TelemetryAttributes attributes = null; + if (success) { + attributes = getOrCreate(checkpointSuccess, attributesId, checkpoint, "ok"); + } else { + attributes = getOrCreate(checkpointFailure, attributesId, checkpoint, "error"); + } + if (attributes != null) { + checkpointCounter.add(1, attributes, Context.NONE); + } + } + } + + private TelemetryAttributes getOrCreate(ConcurrentHashMap source, String attributesId, Checkpoint checkpoint, String status) { + if (maxCapacityReached) { + return source.get(attributesId); + } + + return source.computeIfAbsent(attributesId, i -> meter.createAttributes(createAttributes(checkpoint, status))); + } + + private Map createAttributes(Checkpoint checkpoint, String status) { + Map attributesMap = new HashMap<>(); + attributesMap.put(HOSTNAME_KEY, checkpoint.getFullyQualifiedNamespace()); + attributesMap.put(ENTITY_NAME_KEY, checkpoint.getEventHubName()); + attributesMap.put(PARTITION_ID_KEY, checkpoint.getPartitionId()); + attributesMap.put(CONSUMER_GROUP_KEY, checkpoint.getConsumerGroup()); + if (status != null) { + attributesMap.put("status", status); + } + + return attributesMap; + } + + private void updateCurrentValue(String attributesId, Checkpoint checkpoint) { + if (checkpoint.getSequenceNumber() == null) { + return; + } + + CurrentValue valueSupplier; + if (maxCapacityReached) { + valueSupplier = seqNoSubscriptions.get(attributesId); + } else { + + TelemetryAttributes attributes = getOrCreate(common, attributesId, checkpoint, null); + if (attributes == null) { + return; + } + + valueSupplier = seqNoSubscriptions.computeIfAbsent(attributesId, a -> { + AtomicReference lastSeqNo = new AtomicReference<>(); + return new CurrentValue(lastSequenceNumber.registerCallback(() -> lastSeqNo.get(), attributes), lastSeqNo); + }); + } + + if (valueSupplier != null) { + valueSupplier.set(checkpoint.getSequenceNumber()); + } + } + + private static class CurrentValue { + private final AtomicReference lastSeqNo; + private final AutoCloseable subscription; + + CurrentValue(AutoCloseable subscription, AtomicReference lastSeqNo) { + this.subscription = subscription; + this.lastSeqNo = lastSeqNo; + } + + void set(long value) { + lastSeqNo.set(value); + } + + void close() { + if (subscription != null) { + try { + subscription.close(); + } catch (Exception e) { + // should never happen + throw LOGGER.logThrowableAsWarning(new RuntimeException(e)); + } + } + } + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/azure-messaging-eventhubs-checkpointstore-blob.properties b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/azure-messaging-eventhubs-checkpointstore-blob.properties new file mode 100644 index 0000000000000..ca812989b4f27 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/azure-messaging-eventhubs-checkpointstore-blob.properties @@ -0,0 +1,2 @@ +name=${project.artifactId} +version=${project.version} diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java new file mode 100644 index 0000000000000..1bd402842ff56 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java @@ -0,0 +1,401 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.checkpointstore.blob; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.rest.PagedFlux; +import com.azure.core.http.rest.PagedResponseBase; +import com.azure.core.test.utils.metrics.TestCounter; +import com.azure.core.test.utils.metrics.TestGauge; +import com.azure.core.test.utils.metrics.TestMeasurement; +import com.azure.core.test.utils.metrics.TestMeter; +import com.azure.core.util.ClientOptions; +import com.azure.core.util.MetricsOptions; +import com.azure.core.util.metrics.Meter; +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.ListBlobsOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class BlobCheckpointStoreMetricsTests { + + private static final int MAX_ATTRIBUTES_SETS = 100; + + @Mock + private BlobContainerAsyncClient blobContainerAsyncClient; + + @Mock + private BlockBlobAsyncClient blockBlobAsyncClient; + + @Mock + private BlobAsyncClient blobAsyncClient; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0"); + PagedFlux response = new PagedFlux(() -> Mono.just(new PagedResponseBase(null, 200, null, + Arrays.asList(blobItem), null, + null))); + + when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/checkpoint/0")).thenReturn(blobAsyncClient); + when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response); + when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient); + when(blobAsyncClient.exists()).thenReturn(Mono.just(true)); + when(blobAsyncClient.setMetadata(ArgumentMatchers.>any())) + .thenReturn(Mono.empty()); + } + + @Test + public void testUpdateDisabledMetrics() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Meter meter = mock(Meter.class); + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> { + assertEquals("azure-messaging-eventhubs-checkpointstore-blob", lib); + assertNotNull(ver); + assertNull(opts); + return meter; + }); + + when(meter.isEnabled()).thenReturn(false); + MetricsHelper.setMeterProvider(testProvider); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + + verify(meter, atLeastOnce()).isEnabled(); + verify(meter, never()).createAttributes(anyMap()); + verify(meter, never()).createLongGauge(any(), any(), any()); + verify(meter, never()).createLongCounter(any(), any(), any()); + } + + @Test + public void testUpdateDisabledMetricsViaOptions() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Meter meter = mock(Meter.class); + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> meter); + MetricsHelper.setMeterProvider(testProvider); + + ClientOptions disabledMetrics = new ClientOptions().setMetricsOptions(new MetricsOptions().setEnabled(false)); + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient, disabledMetrics); + StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + + verify(meter, never()).createAttributes(anyMap()); + verify(meter, never()).createLongGauge(any(), any(), any()); + verify(meter, never()).createLongCounter(any(), any(), any()); + } + + @Test + public void testUpdateEnabledMetrics() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + TestMeter meter = new TestMeter(); + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> { + assertEquals("azure-messaging-eventhubs-checkpointstore-blob", lib); + assertNotNull(ver); + assertNull(opts); + return meter; + }); + + MetricsHelper.setMeterProvider(testProvider); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + + assertTrue(meter.getGauges().containsKey("messaging.eventhubs.checkpoint.sequence_number")); + TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); + assertEquals(1, seqNo.getSubscriptions().size()); + TestGauge.Subscription subs = seqNo.getSubscriptions().get(0); + + assertEquals(0, subs.getMeasurements().size()); + subs.measure(); + + TestMeasurement seqNoMeasurement = subs.getMeasurements().get(0); + assertEquals(2L, seqNoMeasurement.getValue()); + assertCommonAttributes(checkpoint, seqNoMeasurement.getAttributes()); + + assertTrue(meter.getCounters().containsKey("messaging.eventhubs.checkpoint")); + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + assertEquals(1, checkpoints.getMeasurements().size()); + TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); + assertEquals(1, checkpointMeasurements.getValue()); + assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes()); + } + + + @Test + public void testUpdateEnabledMetricsFailure() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + TestMeter meter = new TestMeter(); + MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + when(blobAsyncClient.exists()).thenReturn(Mono.defer(() -> Mono.error(new RuntimeException("foo")))); + + StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)) + .expectErrorMatches(e -> e instanceof RuntimeException) + .verify(); + + // sequence number is only reported for successfull checkpoints + assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); + assertEquals(1, checkpointMeasurements.getValue()); + assertStatusAttributes(checkpoint, "error", checkpointMeasurements.getAttributes()); + } + + @Test + public void testUpdateEnabledMetricsNullSeqNo() { + Checkpoint checkpoint = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setOffset(100L); + + TestMeter meter = new TestMeter(); + MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + + assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); + assertEquals(1, checkpointMeasurements.getValue()); + assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes()); + } + + @Test + public void testUpdateEnabledMetricsTooManyAttributes() { + TestMeter meter = new TestMeter(); + MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + when(blobContainerAsyncClient.getBlobAsyncClient(any())).thenReturn(blobAsyncClient); + + List checkpoints = IntStream.range(0, MAX_ATTRIBUTES_SETS + 10) + .mapToObj(n -> new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId(String.valueOf(n)) + .setSequenceNumber((long) n) + .setOffset(100L)) + .collect(Collectors.toList()); + StepVerifier.create(Flux.fromIterable(checkpoints) + .flatMap(c -> blobCheckpointStore.updateCheckpoint(c))) + .verifyComplete(); + + List subscriptions = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions(); + assertEquals(MAX_ATTRIBUTES_SETS, subscriptions.size()); + subscriptions.forEach(subs -> subs.measure()); + + final int[] i = {0}; + subscriptions.forEach(subs -> { + assertEquals(1, subs.getMeasurements().size()); + TestMeasurement seqNoMeasurement = subs.getMeasurements().get(0); + assertEquals(i[0], seqNoMeasurement.getValue()); + assertCommonAttributes(checkpoints.get(i[0]), seqNoMeasurement.getAttributes()); + i[0]++; + }); + + TestCounter checkpointCounter = meter.getCounters().get("messaging.eventhubs.checkpoint"); + assertEquals(MAX_ATTRIBUTES_SETS, checkpointCounter.getMeasurements().size()); + + final int[] j = {0}; + checkpointCounter.getMeasurements().forEach(m -> { + assertEquals(1, m.getValue()); + assertStatusAttributes(checkpoints.get(j[0]), "ok", m.getAttributes()); + j[0]++; + }); + } + + @Test + public void testUpdateEnabledMetricsMultipleMeasurements() { + Checkpoint checkpoint1 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Checkpoint checkpoint2 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(42L) + .setOffset(100L); + + TestMeter meter = new TestMeter(); + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> meter); + + MetricsHelper.setMeterProvider(testProvider); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint1) + .then(blobCheckpointStore.updateCheckpoint(checkpoint2))) + .verifyComplete(); + + TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); + TestGauge.Subscription subs = seqNo.getSubscriptions().get(0); + subs.measure(); + + TestMeasurement seqNoMeasurement = subs.getMeasurements().get(0); + assertEquals(42L, seqNoMeasurement.getValue()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + assertEquals(2, checkpoints.getMeasurements().size()); + + assertEquals(1, checkpoints.getMeasurements().get(0).getValue()); + assertEquals(1, checkpoints.getMeasurements().get(1).getValue()); + assertStatusAttributes(checkpoint2, "ok", checkpoints.getMeasurements().get(1).getAttributes()); + } + + @Test + public void testUpdateEnabledMetricsMultipleHubs() { + Checkpoint checkpoint1 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh1") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(2L) + .setOffset(100L); + + Checkpoint checkpoint2 = new Checkpoint() + .setFullyQualifiedNamespace("ns") + .setEventHubName("eh2") + .setConsumerGroup("cg") + .setPartitionId("0") + .setSequenceNumber(42L) + .setOffset(100L); + when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh1/cg/checkpoint/0")).thenReturn(blobAsyncClient); + when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh2/cg/checkpoint/0")).thenReturn(blobAsyncClient); + + + TestMeter meter = new TestMeter(); + MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient, null); + StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint1) + .then(blobCheckpointStore.updateCheckpoint(checkpoint2))) + .verifyComplete(); + + TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); + assertEquals(2, seqNo.getSubscriptions().size()); + TestGauge.Subscription subs1 = seqNo.getSubscriptions().get(0); + TestGauge.Subscription subs2 = seqNo.getSubscriptions().get(1); + subs1.measure(); + subs2.measure(); + + TestMeasurement seqNoMeasurement1 = subs1.getMeasurements().get(0); + assertEquals(2L, seqNoMeasurement1.getValue()); + assertCommonAttributes(checkpoint1, seqNoMeasurement1.getAttributes()); + + TestMeasurement seqNoMeasurement2 = subs2.getMeasurements().get(0); + assertEquals(42L, seqNoMeasurement2.getValue()); + assertCommonAttributes(checkpoint2, seqNoMeasurement2.getAttributes()); + + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + assertEquals(2, checkpoints.getMeasurements().size()); + + assertEquals(1, checkpoints.getMeasurements().get(0).getValue()); + assertStatusAttributes(checkpoint1, "ok", checkpoints.getMeasurements().get(0).getAttributes()); + assertEquals(1, checkpoints.getMeasurements().get(1).getValue()); + assertStatusAttributes(checkpoint2, "ok", checkpoints.getMeasurements().get(1).getAttributes()); + } + + + private void assertStatusAttributes(Checkpoint checkpoint, String expectedStatus, Map attributes) { + assertEquals(5, attributes.size()); + assertEquals(checkpoint.getFullyQualifiedNamespace(), attributes.get("hostName")); + assertEquals(checkpoint.getEventHubName(), attributes.get("entityName")); + assertEquals(checkpoint.getPartitionId(), attributes.get("partitionId")); + assertEquals(checkpoint.getConsumerGroup(), attributes.get("consumerGroup")); + assertEquals(expectedStatus, attributes.get("status")); + } + + private void assertCommonAttributes(Checkpoint checkpoint, Map attributes) { + assertEquals(4, attributes.size()); + assertEquals(checkpoint.getFullyQualifiedNamespace(), attributes.get("hostName")); + assertEquals(checkpoint.getEventHubName(), attributes.get("entityName")); + assertEquals(checkpoint.getPartitionId(), attributes.get("partitionId")); + assertEquals(checkpoint.getConsumerGroup(), attributes.get("consumerGroup")); + } + + private BlobItem getCheckpointBlobItem(String offset, String sequenceNumber, String blobName) { + Map metadata = new HashMap<>(); + metadata.put("sequencenumber", sequenceNumber); + metadata.put("offset", offset); + return new BlobItem() + .setName(blobName) + .setMetadata(metadata); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java index 30a6edde88b02..51b6d653ce17a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java @@ -58,7 +58,7 @@ public class BlobEventProcessorClientStoreTest { @BeforeEach public void setup() { - MockitoAnnotations.initMocks(this); + MockitoAnnotations.openMocks(this); } @Test diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/TestMeterProvider.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/TestMeterProvider.java new file mode 100644 index 0000000000000..0bf36956c5c77 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/TestMeterProvider.java @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.checkpointstore.blob; + +import com.azure.core.util.MetricsOptions; +import com.azure.core.util.metrics.Meter; +import com.azure.core.util.metrics.MeterProvider; + +public class TestMeterProvider implements MeterProvider { + + private final MeterFactory meterFactory; + public TestMeterProvider(MeterFactory meterFactory) { + this.meterFactory = meterFactory; + } + + @Override + public Meter createMeter(String libraryName, String libraryVersion, MetricsOptions options) { + return meterFactory.createMeter(libraryName, libraryVersion, options); + } + + @FunctionalInterface + interface MeterFactory { + Meter createMeter(String libraryName, String libraryVersion, MetricsOptions options); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java index aacd7e8836376..4e777deaa9969 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.TracerProvider; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; import com.azure.messaging.eventhubs.implementation.EventHubManagementNode; import reactor.core.publisher.Flux; @@ -33,10 +34,11 @@ class EventHubAsyncClient implements Closeable { private final Runnable onClientClose; private final TracerProvider tracerProvider; private final String identifier; + private final Meter meter; EventHubAsyncClient(EventHubConnectionProcessor connectionProcessor, TracerProvider tracerProvider, MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose, - String identifier) { + String identifier, Meter meter) { this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null."); this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null."); this.connectionProcessor = Objects.requireNonNull(connectionProcessor, @@ -46,6 +48,7 @@ class EventHubAsyncClient implements Closeable { this.isSharedConnection = isSharedConnection; this.identifier = identifier; + this.meter = meter; } /** @@ -108,7 +111,7 @@ Mono getPartitionProperties(String partitionId) { EventHubProducerAsyncClient createProducer() { return new EventHubProducerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(), connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer, scheduler, - isSharedConnection, onClientClose, identifier); + isSharedConnection, onClientClose, identifier, meter); } /** @@ -133,7 +136,7 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou return new EventHubConsumerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(), connectionProcessor, messageSerializer, consumerGroup, prefetchCount, isSharedConnection, - onClientClose, identifier); + onClientClose, identifier, meter); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 55cd67314eb21..94c03cb5a70a3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -34,6 +34,8 @@ import com.azure.core.util.Configuration; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; +import com.azure.core.util.metrics.MeterProvider; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection; @@ -174,6 +176,9 @@ public class EventHubClientBuilder implements private static final String EVENTHUBS_PROPERTIES_FILE = "azure-messaging-eventhubs.properties"; private static final String NAME_KEY = "name"; private static final String VERSION_KEY = "version"; + + private static final String LIBRARY_NAME; + private static final String LIBRARY_VERSION; private static final String UNKNOWN = "UNKNOWN"; private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING"; @@ -199,6 +204,11 @@ public class EventHubClientBuilder implements private SslDomain.VerifyMode verifyMode; private URL customEndpointAddress; + static { + final Map properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE); + LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN); + LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN); + } /** * Keeps track of the open clients that were created from this builder when there is a shared connection. */ @@ -794,13 +804,15 @@ EventHubAsyncClient buildAsyncClient() { prefetchCount = DEFAULT_PREFETCH_COUNT; } + final Meter meter = MeterProvider.getDefaultProvider().createMeter(LIBRARY_NAME, LIBRARY_VERSION, + clientOptions == null ? null : clientOptions.getMetricsOptions()); final MessageSerializer messageSerializer = new EventHubMessageSerializer(); final EventHubConnectionProcessor processor; if (isSharedConnection.get()) { synchronized (connectionLock) { if (eventHubConnectionProcessor == null) { - eventHubConnectionProcessor = buildConnectionProcessor(messageSerializer); + eventHubConnectionProcessor = buildConnectionProcessor(messageSerializer, meter); } } @@ -809,7 +821,7 @@ EventHubAsyncClient buildAsyncClient() { final int numberOfOpenClients = openClients.incrementAndGet(); LOGGER.info("# of open clients with shared connection: {}", numberOfOpenClients); } else { - processor = buildConnectionProcessor(messageSerializer); + processor = buildConnectionProcessor(messageSerializer, meter); } final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class)); @@ -823,8 +835,7 @@ EventHubAsyncClient buildAsyncClient() { } return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, scheduler, - isSharedConnection.get(), this::onClientClose, - identifier); + isSharedConnection.get(), this::onClientClose, identifier, meter); } /** @@ -884,7 +895,7 @@ void onClientClose() { } } - private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer) { + private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer, Meter meter) { final ConnectionOptions connectionOptions = getConnectionOptions(); final Flux connectionFlux = Flux.create(sink -> { sink.onRequest(request -> { @@ -906,7 +917,7 @@ private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer m connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getAuthorizationScope()); final ReactorProvider provider = new ReactorProvider(); - final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider); + final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider, meter); final EventHubAmqpConnection connection = new EventHubReactorAmqpConnection(connectionId, connectionOptions, getEventHubName(), provider, handlerProvider, tokenManagerProvider, @@ -959,18 +970,15 @@ private ConnectionOptions getConnectionOptions() { : SslDomain.VerifyMode.VERIFY_PEER_NAME; final ClientOptions options = clientOptions != null ? clientOptions : new ClientOptions(); - final Map properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE); - final String product = properties.getOrDefault(NAME_KEY, UNKNOWN); - final String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN); if (customEndpointAddress == null) { return new ConnectionOptions(getAndValidateFullyQualifiedNamespace(), credentials, authorizationType, ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE, transport, retryOptions, proxyOptions, scheduler, - options, verificationMode, product, clientVersion); + options, verificationMode, LIBRARY_NAME, LIBRARY_VERSION); } else { return new ConnectionOptions(getAndValidateFullyQualifiedNamespace(), credentials, authorizationType, ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE, transport, retryOptions, proxyOptions, scheduler, - options, verificationMode, product, clientVersion, customEndpointAddress.getHost(), + options, verificationMode, LIBRARY_NAME, LIBRARY_VERSION, customEndpointAddress.getHost(), customEndpointAddress.getPort()); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java index e4732052970db..ed9453b682258 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java @@ -13,9 +13,11 @@ import com.azure.core.annotation.ServiceClient; import com.azure.core.annotation.ServiceMethod; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; import com.azure.messaging.eventhubs.implementation.EventHubManagementNode; +import com.azure.messaging.eventhubs.implementation.EventHubsMetricsProvider; import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.PartitionEvent; import com.azure.messaging.eventhubs.models.ReceiveOptions; @@ -155,6 +157,7 @@ public class EventHubConsumerAsyncClient implements Closeable { private final boolean isSharedConnection; private final Runnable onClientClosed; private final String identifier; + private final EventHubsMetricsProvider metricsProvider; /** * Keeps track of the open partition consumers keyed by linkName. The link name is generated as: {@code * "partitionId_GUID"}. For receiving from all partitions, links are prefixed with {@code "all-GUID-partitionId"}. @@ -164,7 +167,7 @@ public class EventHubConsumerAsyncClient implements Closeable { EventHubConsumerAsyncClient(String fullyQualifiedNamespace, String eventHubName, EventHubConnectionProcessor connectionProcessor, MessageSerializer messageSerializer, String consumerGroup, - int prefetchCount, boolean isSharedConnection, Runnable onClientClosed, String identifier) { + int prefetchCount, boolean isSharedConnection, Runnable onClientClosed, String identifier, Meter meter) { this.fullyQualifiedNamespace = fullyQualifiedNamespace; this.eventHubName = eventHubName; this.connectionProcessor = connectionProcessor; @@ -174,6 +177,7 @@ public class EventHubConsumerAsyncClient implements Closeable { this.isSharedConnection = isSharedConnection; this.onClientClosed = onClientClosed; this.identifier = identifier; + this.metricsProvider = new EventHubsMetricsProvider(meter, fullyQualifiedNamespace, eventHubName, consumerGroup); } /** @@ -417,6 +421,7 @@ private Flux createConsumer(String linkName, String partitionId, .computeIfAbsent(linkName, name -> createPartitionConsumer(name, partitionId, startingPosition, receiveOptions)) .receive() + .doOnNext(event -> metricsProvider.reportReceive(event)) .doFinally(signal -> removeLink(linkName, partitionId, signal)); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index ab1a09217b9ed..f332af33aef62 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -18,10 +18,12 @@ import com.azure.core.util.Context; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.core.util.tracing.ProcessKind; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; import com.azure.messaging.eventhubs.implementation.EventHubManagementNode; +import com.azure.messaging.eventhubs.implementation.EventHubsMetricsProvider; import com.azure.messaging.eventhubs.models.CreateBatchOptions; import com.azure.messaging.eventhubs.models.SendOptions; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; @@ -198,6 +200,8 @@ public class EventHubProducerAsyncClient implements Closeable { private final Runnable onClientClose; private final String identifier; + private final EventHubsMetricsProvider metricsProvider; + /** * Creates a new instance of this {@link EventHubProducerAsyncClient} that can send messages to a single partition * when {@link CreateBatchOptions#getPartitionId()} is not null or an empty string. Otherwise, allows the service to @@ -206,7 +210,7 @@ public class EventHubProducerAsyncClient implements Closeable { EventHubProducerAsyncClient(String fullyQualifiedNamespace, String eventHubName, EventHubConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider, MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose, - String identifier) { + String identifier, Meter meter) { this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null."); @@ -221,6 +225,7 @@ public class EventHubProducerAsyncClient implements Closeable { this.scheduler = scheduler; this.isSharedConnection = isSharedConnection; this.identifier = identifier; + this.metricsProvider = new EventHubsMetricsProvider(meter, fullyQualifiedNamespace, eventHubName, null); } /** @@ -317,7 +322,9 @@ public Mono createBatch(CreateBatchOptions options) { MAX_PARTITION_KEY_LENGTH))); } - return getSendLink(partitionId) + final String entityPath = getEntityPath(partitionId); + + return getSendLink(entityPath) .flatMap(link -> link.getLinkSize() .flatMap(size -> { final int maximumLinkSize = size > 0 @@ -574,7 +581,8 @@ public Mono send(EventDataBatch batch) { parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, sharedContext, ProcessKind.SEND)); } - final Mono sendMessage = getSendLink(batch.getPartitionId()) + final String entityPath = getEntityPath(batch.getPartitionId()); + final Mono sendMessage = getSendLink(entityPath) .flatMap(link -> messages.size() == 1 ? link.send(messages.get(0)) : link.send(messages)); @@ -583,9 +591,11 @@ public Mono send(EventDataBatch batch) { String.format("partitionId[%s]: Sending messages timed out.", batch.getPartitionId())) .publishOn(scheduler) .doOnEach(signal -> { + Context context = isTracingEnabled ? parentContext.get() : Context.NONE; if (isTracingEnabled) { - tracerProvider.endSpan(parentContext.get(), signal); + tracerProvider.endSpan(context, signal); } + metricsProvider.reportBatchSend(batch, batch.getPartitionId(), signal.getThrowable(), context); }); } @@ -601,7 +611,8 @@ private Mono sendInternal(Flux events, SendOptions options) { partitionKey, partitionId))); } - return getSendLink(options.getPartitionId()) + final String entityPath = getEntityPath(options.getPartitionId()); + return getSendLink(entityPath) .flatMap(link -> link.getLinkSize() .flatMap(size -> { final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; @@ -630,9 +641,8 @@ private String getEntityPath(String partitionId) { : String.format(Locale.US, SENDER_ENTITY_PATH_FORMAT, eventHubName, partitionId); } - private Mono getSendLink(String partitionId) { - final String entityPath = getEntityPath(partitionId); - final String linkName = getEntityPath(partitionId); + private Mono getSendLink(String entityPath) { + final String linkName = entityPath; return connectionProcessor .flatMap(connection -> connection.createSendLink(linkName, entityPath, retryOptions, identifier)); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java new file mode 100644 index 0000000000000..d87cbe160dc12 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import com.azure.core.util.Context; +import com.azure.core.util.TelemetryAttributes; +import com.azure.core.util.metrics.DoubleHistogram; +import com.azure.core.util.metrics.LongCounter; +import com.azure.core.util.metrics.Meter; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.models.PartitionEvent; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_NAME_KEY; + +public class EventHubsMetricsProvider { + private static final String PARTITION_ID_KEY = "partitionId"; + private final Meter meter; + private final boolean isEnabled; + + private AttributeCache sendAttributeCacheSuccess; + private AttributeCache sendAttributeCacheFailure; + private AttributeCache receiveAttributeCache; + private LongCounter sentEventsCounter; + private DoubleHistogram consumerLag; + + public EventHubsMetricsProvider(Meter meter, String namespace, String entityName, String consumerGroup) { + this.meter = meter; + this.isEnabled = meter != null && meter.isEnabled(); + if (this.isEnabled) { + Map commonAttributesMap = new HashMap<>(); + commonAttributesMap.put("hostName", namespace); + commonAttributesMap.put(ENTITY_NAME_KEY, entityName); + if (consumerGroup != null) { + commonAttributesMap.put("consumerGroup", consumerGroup); + } + + Map successMap = new HashMap<>(commonAttributesMap); + successMap.put("status", "ok"); + this.sendAttributeCacheSuccess = new AttributeCache(PARTITION_ID_KEY, successMap); + + Map failureMap = new HashMap<>(commonAttributesMap); + failureMap.put("status", "error"); + this.sendAttributeCacheFailure = new AttributeCache(PARTITION_ID_KEY, failureMap); + + this.receiveAttributeCache = new AttributeCache(PARTITION_ID_KEY, commonAttributesMap); + this.sentEventsCounter = meter.createLongCounter("messaging.eventhubs.events.sent", "Number of sent events", "events"); + this.consumerLag = meter.createDoubleHistogram("messaging.eventhubs.consumer.lag", "Difference between local time when event was received and the local time it was enqueued on broker.", "sec"); + } + } + + public void reportBatchSend(EventDataBatch batch, String partitionId, Throwable throwable, Context context) { + if (isEnabled && sentEventsCounter.isEnabled()) { + // entity name is already reported + AttributeCache cache = throwable == null ? sendAttributeCacheSuccess : sendAttributeCacheFailure; + sentEventsCounter.add(batch.getCount(), cache.getOrCreate(partitionId), context); + } + } + + public void reportReceive(PartitionEvent event) { + if (isEnabled && consumerLag.isEnabled()) { + Instant enqueuedTime = event.getData().getEnqueuedTime(); + double diff = 0d; + if (enqueuedTime != null) { + diff = Instant.now().toEpochMilli() - enqueuedTime.toEpochMilli(); + if (diff < 0) { + // time skew on machines + diff = 0; + } + } + consumerLag.record(diff / 1000d, + receiveAttributeCache.getOrCreate(event.getPartitionContext().getPartitionId()), + Context.NONE); + } + } + + class AttributeCache { + private final Map attr = new ConcurrentHashMap<>(); + private final TelemetryAttributes commonAttr; + private final Map commonMap; + private final String dimensionName; + + AttributeCache(String dimensionName, Map common) { + this.dimensionName = dimensionName; + this.commonMap = common; + this.commonAttr = meter.createAttributes(commonMap); + } + + public TelemetryAttributes getOrCreate(String value) { + if (value == null) { + return commonAttr; + } + + return attr.computeIfAbsent(value, this::create); + } + + private TelemetryAttributes create(String value) { + Map attributes = new HashMap<>(commonMap); + attributes.put(dimensionName, value); + return meter.createAttributes(attributes); + } + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java index ec4748fbf0dce..32df913bad0be 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java @@ -12,9 +12,13 @@ import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.credential.TokenCredential; +import com.azure.core.test.utils.metrics.TestHistogram; +import com.azure.core.test.utils.metrics.TestMeasurement; +import com.azure.core.test.utils.metrics.TestMeter; import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; @@ -23,6 +27,7 @@ import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties; import com.azure.messaging.eventhubs.models.PartitionEvent; import com.azure.messaging.eventhubs.models.ReceiveOptions; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.AfterAll; @@ -48,16 +53,21 @@ import java.time.Duration; import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; import static com.azure.messaging.eventhubs.TestUtils.getMessage; import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -86,6 +96,7 @@ class EventHubConsumerAsyncClientTest { private static final String CONSUMER_GROUP = "consumer-group-test"; private static final String PARTITION_ID = "a-partition-id"; private static final String CLIENT_IDENTIFIER = "my-client-identifier"; + private static final Meter DEFAULT_METER = null; private static final ClientLogger LOGGER = new ClientLogger(EventHubConsumerAsyncClientTest.class); private final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setMaxRetries(2); @@ -148,7 +159,7 @@ AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), ProxyOptions.SYSTEM_ "event-hub-name", connectionOptions.getRetry())); consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, - CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); } @AfterEach @@ -170,7 +181,7 @@ void teardown() throws Exception { @Test void lastEnqueuedEventInformationIsNull() { final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int numberOfEvents = 10; when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); final int numberToReceive = 3; @@ -193,7 +204,7 @@ void lastEnqueuedEventInformationIsNull() { void lastEnqueuedEventInformationCreated() { // Arrange final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int numberOfEvents = 10; final ReceiveOptions receiveOptions = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true); when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); @@ -204,7 +215,7 @@ void lastEnqueuedEventInformationCreated() { .then(() -> sendMessages(messageProcessor, numberOfEvents, PARTITION_ID)) .assertNext(event -> { LastEnqueuedEventProperties properties = event.getLastEnqueuedEventProperties(); - Assertions.assertNotNull(properties); + assertNotNull(properties); Assertions.assertNull(properties.getOffset()); Assertions.assertNull(properties.getSequenceNumber()); Assertions.assertNull(properties.getRetrievalTime()); @@ -246,7 +257,7 @@ void receivesNumberOfEventsAllowsBlock() throws InterruptedException { // Scheduling on elastic to simulate a user passed in scheduler (this is the default in EventHubClientBuilder). final EventHubConsumerAsyncClient myConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final Flux eventsFlux = myConsumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) .take(numberOfEvents); @@ -306,7 +317,7 @@ void returnsNewListener() { any(ReceiveOptions.class), anyString())).thenReturn(Mono.just(link2), Mono.just(link3)); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act & Assert StepVerifier.create(asyncClient.receiveFromPartition(PARTITION_ID, EventPosition.earliest()).take(numberOfEvents)) @@ -529,7 +540,7 @@ void receivesMultiplePartitions() { .thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions))); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); TestPublisher processor2 = TestPublisher.createCold(); AmqpReceiveLink link2 = mock(AmqpReceiveLink.class); @@ -604,7 +615,7 @@ void receivesMultiplePartitionsWhenOneCloses() { .thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions))); EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); TestPublisher processor2 = TestPublisher.create(); AmqpReceiveLink link2 = mock(AmqpReceiveLink.class); @@ -664,7 +675,7 @@ void doesNotCloseSharedConnection() { EventHubConnectionProcessor eventHubConnection = Flux.create(sink -> sink.next(connection1)) .subscribeWith(new EventHubConnectionProcessor(HOSTNAME, EVENT_HUB_NAME, retryOptions)); EventHubConsumerAsyncClient sharedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, true, onClientClosed, CLIENT_IDENTIFIER); + eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, true, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act sharedConsumer.close(); @@ -684,7 +695,7 @@ void closesDedicatedConnection() { // Arrange EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubConsumerAsyncClient dedicatedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - hubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + hubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act dedicatedConsumer.close(); @@ -695,6 +706,123 @@ void closesDedicatedConnection() { verifyNoInteractions(onClientClosed); } + @Test + void receiveReportsMetrics() { + // Arrange + final int numberOfEvents = 2; + when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents); + + TestMeter meter = new TestMeter(); + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(numberOfEvents); + + Instant enqueuedTime = Instant.now().minusSeconds(1000); + sendMessages(messageProcessor, numberOfEvents, PARTITION_ID, enqueuedTime); + + // Act + StepVerifier.create(receive) + .thenConsumeWhile(e -> { + TestHistogram consumerLag = meter.getHistograms().get("messaging.eventhubs.consumer.lag"); + assertNotNull(consumerLag); + Instant afterReceived = Instant.now(); + + List> measurements = consumerLag.getMeasurements(); + TestMeasurement last = measurements.get(measurements.size() - 1); + assertEquals(Duration.between(enqueuedTime, afterReceived).toMillis(), last.getValue() * 1000, 10d); + assertAttributes(EVENT_HUB_NAME, e.getPartitionContext().getPartitionId(), last.getAttributes()); + return true; + }) + .verifyComplete(); + + assertEquals(numberOfEvents, meter.getHistograms().get("messaging.eventhubs.consumer.lag").getMeasurements().size()); + } + + @Test + void receiveReportsMetricsNegativeLag() { + // Arrange + when(amqpReceiveLink.getCredits()).thenReturn(1); + + TestMeter meter = new TestMeter(); + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(1); + + Instant enqueuedTime1 = Instant.now().plusSeconds(1000); + sendMessages(messageProcessor, 1, PARTITION_ID, enqueuedTime1); + + // Act + StepVerifier.create(receive) + .consumeNextWith(e -> { + TestHistogram consumerLag = meter.getHistograms().get("messaging.eventhubs.consumer.lag"); + assertNotNull(consumerLag); + Instant afterReceived = Instant.now(); + + List> measurements = consumerLag.getMeasurements(); + TestMeasurement last = measurements.get(measurements.size() - 1); + assertEquals(0, last.getValue()); + assertAttributes(EVENT_HUB_NAME, e.getPartitionContext().getPartitionId(), last.getAttributes()); + }) + .verifyComplete(); + + assertEquals(1, meter.getHistograms().get("messaging.eventhubs.consumer.lag").getMeasurements().size()); + } + + @Test + void receiveDoesNotReportDisabledMetrics() { + // Arrange + when(amqpReceiveLink.getCredits()).thenReturn(1); + + TestMeter meter = new TestMeter(false); + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(1); + + // Act + sendMessages(messageProcessor, 1, PARTITION_ID); + StepVerifier.create(receive) + .expectNextCount(1) + .verifyComplete(); + + assertFalse(meter.getHistograms().containsKey("messaging.eventhubs.consumer.lag")); + } + + @Test + void receiveNullMeterDoesNotThrow() { + // Arrange + when(amqpReceiveLink.getCredits()).thenReturn(1); + + consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, + CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, null); + + Flux receive = consumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest()) + .filter(e -> isMatchingEvent(e, messageTrackingUUID)) + .take(1); + + // Act + sendMessages(messageProcessor, 1, PARTITION_ID); + StepVerifier.create(receive) + .expectNextCount(1) + .verifyComplete(); + } + + private void assertAttributes(String entityName, String entityPath, Map attributes) { + assertEquals(4, attributes.size()); + assertEquals(HOSTNAME, attributes.get("hostName")); + assertEquals(entityName, attributes.get("entityName")); + assertEquals(CONSUMER_GROUP, attributes.get("consumerGroup")); + assertEquals(entityPath, attributes.get("partitionId")); + } + private void assertPartition(String partitionId, PartitionEvent event) { System.out.println("Event received: " + event.getPartitionContext().getPartitionId()); final Object value = event.getData().getProperties().get(PARTITION_ID_HEADER); @@ -713,4 +841,14 @@ private void sendMessages(TestPublisher testPublisher, int numberOfEven testPublisher.next(getMessage(PAYLOAD_BYTES, messageTrackingUUID, map)); } } + + private void sendMessages(TestPublisher testPublisher, int numberOfEvents, String partitionId, Instant enqueueTime) { + Map map = Collections.singletonMap(PARTITION_ID_HEADER, partitionId); + + for (int i = 0; i < numberOfEvents; i++) { + Message message = getMessage(PAYLOAD_BYTES, messageTrackingUUID, map); + message.getMessageAnnotations().getValue().put(Symbol.valueOf(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()), enqueueTime); + testPublisher.next(message); + } + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java index 64881caece816..b1da9c2be2059 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java @@ -15,6 +15,7 @@ import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.IterableStream; +import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection; import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor; @@ -74,6 +75,7 @@ public class EventHubConsumerClientTest { private static final String CLIENT_IDENTIFIER = "my-client-identifier"; private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(4); private static final Duration TIMEOUT = Duration.ofSeconds(30); + private static final Meter DEFAULT_METER = null; private final String messageTrackingUUID = UUID.randomUUID().toString(); private final TestPublisher messageProcessor = TestPublisher.createCold(); @@ -138,7 +140,7 @@ AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), ProxyOptions.SYSTEM_ when(connection.closeAsync()).thenReturn(Mono.empty()); asyncConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, - connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); consumer = new EventHubConsumerClient(asyncConsumer, Duration.ofSeconds(10)); } @@ -164,7 +166,7 @@ public void lastEnqueuedEventInformationIsNull() { // Arrange final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient( HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP, - PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER); + PREFETCH, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5)); final int numberOfEvents = 10; sendMessages(messageProcessor, numberOfEvents, PARTITION_ID); @@ -194,7 +196,7 @@ public void lastEnqueuedEventInformationCreated() { final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true); final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient( HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5)); final int numberOfEvents = 10; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java index bb83e04b454ad..72137f1a25201 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java @@ -17,10 +17,14 @@ import com.azure.core.amqp.implementation.TracerProvider; import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.credential.TokenCredential; +import com.azure.core.test.utils.metrics.TestCounter; +import com.azure.core.test.utils.metrics.TestMeasurement; +import com.azure.core.test.utils.metrics.TestMeter; import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.Meter; import com.azure.core.util.tracing.ProcessKind; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.ClientConstants; @@ -55,6 +59,7 @@ import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -71,6 +76,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -94,7 +100,7 @@ class EventHubProducerAsyncClientTest { private static final String ENTITY_PATH = HOSTNAME + Configuration.getGlobalConfiguration() .get("AZURE_EVENTHUBS_ENDPOINT_SUFFIX", ".servicebus.windows.net"); private static final ClientLogger LOGGER = new ClientLogger(EventHubProducerAsyncClient.class); - + private static final Meter DEFAULT_METER = null; @Mock private AmqpSendLink sendLink; @Mock @@ -161,7 +167,7 @@ void setup(TestInfo testInfo) { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), "event-hub-path", connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); when(sendLink.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); when(sendLink2.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); @@ -255,7 +261,7 @@ void sendSingleMessageWithBlock() throws InterruptedException { // In our actual client builder, we allow this. final EventHubProducerAsyncClient flexibleProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // EC is the prefix they use when creating a link that sends to the service round-robin. when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER))) @@ -334,7 +340,7 @@ void sendStartSpanSingleMessage() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); @@ -402,7 +408,7 @@ void sendMessageRetrySpanTest() { TracerProvider tracerProvider = new TracerProvider(tracers); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final String failureKey = "fail"; final EventData testData = new EventData("test") @@ -539,7 +545,7 @@ void startMessageSpansOnCreateBatch() { TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); @@ -822,6 +828,148 @@ void sendsAnEventDataBatch() { verify(link, times(2)).getLinkSize(); } + @Test + void sendsAnEventDataBatchWithMetrics() { + String eventHub1 = EVENT_HUB_NAME + "1"; + String eventHub2 = EVENT_HUB_NAME + "2"; + + when(connection.createSendLink(eq(eventHub1), eq(eventHub1), any(), any())).thenReturn(Mono.just(sendLink)); + when(connection.createSendLink(eq(eventHub2), eq(eventHub2), any(), any())).thenReturn(Mono.just(sendLink2)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(eventHub1); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + when(sendLink2.send(anyList())).thenReturn(Mono.empty()); + when(sendLink2.getHostname()).thenReturn(HOSTNAME); + when(sendLink2.getEntityPath()).thenReturn(eventHub1); + when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty()); + + TestMeter meter = new TestMeter(); + EventHubProducerAsyncClient producer1 = new EventHubProducerAsyncClient(HOSTNAME, eventHub1, + connectionProcessor, retryOptions, + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + EventHubProducerAsyncClient producer2 = new EventHubProducerAsyncClient(HOSTNAME, eventHub2, + connectionProcessor, retryOptions, + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + StepVerifier.create(producer1.createBatch() + .flatMap(batch -> { + batch.tryAdd(new EventData("1")); + return producer1.send(batch); + }) + .then(producer2.createBatch(). + flatMap(batch -> { + batch.tryAdd(new EventData("2")); + batch.tryAdd(new EventData("3")); + return producer2.send(batch); + }))) + .verifyComplete(); + + TestCounter eventCounter = meter.getCounters().get("messaging.eventhubs.events.sent"); + assertNotNull(eventCounter); + + List> measurements = eventCounter.getMeasurements(); + assertEquals(2, measurements.size()); + + assertEquals(1, measurements.get(0).getValue()); + assertEquals(2, measurements.get(1).getValue()); + assertAttributes(eventHub1, null, "ok", measurements.get(0).getAttributes()); + assertAttributes(eventHub2, null, "ok", measurements.get(1).getAttributes()); + } + + @Test + void sendsAnEventDataBatchWithMetricsFailure() { + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(any(Message.class))).thenReturn(Mono.error(new RuntimeException("foo"))); + + TestMeter meter = new TestMeter(); + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, + tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + StepVerifier.create(producer.send(new EventData("1"))) + .expectErrorMessage("foo") + .verify(); + + TestCounter eventCounter = meter.getCounters().get("messaging.eventhubs.events.sent"); + assertNotNull(eventCounter); + + List> measurements = eventCounter.getMeasurements(); + assertEquals(1, measurements.size()); + + assertEquals(1, measurements.get(0).getValue()); + assertAttributes(EVENT_HUB_NAME, null, "error", measurements.get(0).getAttributes()); + } + + @Test + void sendsAnEventDataBatchWithMetricsPartitionId() { + String partitionId = "1"; + String entityPath = EVENT_HUB_NAME + "/Partitions/" + partitionId; + when(connection.createSendLink(eq(entityPath), eq(entityPath), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(entityPath); + when(sendLink.getLinkName()).thenReturn(entityPath); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + TestMeter meter = new TestMeter(); + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + SendOptions options = new SendOptions().setPartitionId(partitionId); + StepVerifier.create(producer.send(new EventData("1"), options)) + .verifyComplete(); + + TestCounter eventCounter = meter.getCounters().get("messaging.eventhubs.events.sent"); + assertNotNull(eventCounter); + + List> measurements = eventCounter.getMeasurements(); + assertEquals(1, measurements.size()); + + assertEquals(1, measurements.get(0).getValue()); + assertAttributes(EVENT_HUB_NAME, partitionId, "ok", measurements.get(0).getAttributes()); + } + + @Test + void sendsAnEventDataBatchWithDisabledMetrics() { + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.getLinkName()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + TestMeter meter = new TestMeter(false); + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, meter); + + StepVerifier.create(producer.send(new EventData("1"))) + .verifyComplete(); + + assertFalse(meter.getCounters().containsKey("messaging.eventhubs.events.sent")); + } + + @Test + void sendsAnEventDataBatchWithNullMeterDoesNotThrow() { + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), any())).thenReturn(Mono.just(sendLink)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.getLinkName()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + EventHubProducerAsyncClient producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler, false, onClientClosed, CLIENT_IDENTIFIER, null); + + StepVerifier.create(producer.send(new EventData("1"))) + .verifyComplete(); + } + /** * Verify we can send messages to multiple partitionIds with same sender. */ @@ -888,7 +1036,7 @@ void doesNotCloseSharedConnection() { EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient sharedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - true, onClientClosed, CLIENT_IDENTIFIER); + true, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act sharedProducer.close(); @@ -907,7 +1055,7 @@ void closesDedicatedConnection() { EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act dedicatedProducer.close(); @@ -926,7 +1074,7 @@ void closesDedicatedConnectionOnlyOnce() { EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class); EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); // Act dedicatedProducer.close(); @@ -962,7 +1110,7 @@ void reopensOnFailure() { new EventHubConnectionProcessor(EVENT_HUB_NAME, connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -1037,7 +1185,7 @@ void closesOnNonTransientFailure() { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), EVENT_HUB_NAME, connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -1113,7 +1261,7 @@ void resendMessageOnTransientLinkFailure() { new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), EVENT_HUB_NAME, connectionOptions.getRetry())); producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final int count = 4; final byte[] contents = TEST_CONTENTS.getBytes(UTF_8); @@ -1167,6 +1315,14 @@ void resendMessageOnTransientLinkFailure() { verifyNoInteractions(onClientClosed); } + private void assertAttributes(String entityName, String entityPath, String status, Map attributes) { + assertEquals(entityPath == null ? 3 : 4, attributes.size()); + assertEquals(HOSTNAME, attributes.get("hostName")); + assertEquals(entityName, attributes.get("entityName")); + assertEquals(entityPath, attributes.get("partitionId")); + assertEquals(status, attributes.get("status")); + } + private static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec " + "vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. " + "Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet " diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java index fd3582cc80b9a..d751e4b57fd8d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java @@ -18,6 +18,7 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; import com.azure.core.util.Context; +import com.azure.core.util.metrics.Meter; import com.azure.core.util.tracing.ProcessKind; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.eventhubs.implementation.ClientConstants; @@ -77,6 +78,7 @@ public class EventHubProducerClientTest { private static final String HOSTNAME = "my-host-name"; private static final String EVENT_HUB_NAME = "my-event-hub-name"; private static final String CLIENT_IDENTIFIER = "my-client-identifier"; + private static final Meter DEFAULT_METER = null; private final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30)); private final MessageSerializer messageSerializer = new EventHubMessageSerializer(); @@ -117,7 +119,7 @@ public void setup() { .subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), "event-hub-path", connectionOptions.getRetry())); asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, - tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER); + tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); when(connection.closeAsync()).thenReturn(Mono.empty()); @@ -171,7 +173,7 @@ public void sendStartSpanSingleMessage() { final TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)); @@ -246,7 +248,7 @@ public void sendMessageRetrySpanTest() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)) .addContext(SPAN_CONTEXT_KEY, "span-context"); @@ -300,7 +302,7 @@ public void sendEventsExceedsBatchSize() { TracerProvider tracerProvider = new TracerProvider(Collections.emptyList()); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); //Act & Assert @@ -403,7 +405,7 @@ public void startsMessageSpanOnEventBatch() { final TracerProvider tracerProvider = new TracerProvider(tracers); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), - false, onClientClosed, CLIENT_IDENTIFIER); + false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_METER); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final AmqpSendLink link = mock(AmqpSendLink.class); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java index 9da77be80884d..9ddf74d072824 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java @@ -41,7 +41,7 @@ import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; -import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY; +import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -147,7 +147,7 @@ public void testWithSimplePartitionProcessor() throws Exception { return passed.addData(SPAN_CONTEXT_KEY, "value1") .addData("scope", (AutoCloseable) () -> { }) - .addData(PARENT_SPAN_KEY, "value2"); + .addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -239,7 +239,7 @@ public void testProcessSpans() throws Exception { assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { return; - }).addData(PARENT_SPAN_KEY, "value2"); + }).addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -303,7 +303,7 @@ public void testProcessSpansWithoutDiagnosticId() throws Exception { assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { return; - }).addData(PARENT_SPAN_KEY, "value2"); + }).addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -628,7 +628,7 @@ public void testSingleEventReceiveHeartBeat() throws InterruptedException { Context passed = invocation.getArgument(1, Context.class); return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (AutoCloseable) () -> { return; - }).addData(PARENT_SPAN_KEY, "value2"); + }).addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java index ef750a325041f..b363ef59af407 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java @@ -20,6 +20,7 @@ import com.azure.core.amqp.implementation.TokenManagerProvider; import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.credential.TokenCredential; +import com.azure.core.test.utils.metrics.TestMeter; import com.azure.core.util.ClientOptions; import com.azure.core.util.CoreUtils; import com.azure.core.util.Header; @@ -89,7 +90,8 @@ protected void beforeTest() { retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofMinutes(1)); reactorProvider = new ReactorProvider(); - handlerProvider = new ReactorHandlerProvider(reactorProvider); + + handlerProvider = new ReactorHandlerProvider(reactorProvider, new TestMeter(false)); clientOptions.setHeaders( Arrays.asList(new Header("name", product), new Header("version", clientVersion))); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java index c4e35363a7ec5..f00225382b96d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/EventHubReactorConnectionTest.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.AmqpMetricsProvider; import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.ReactorDispatcher; @@ -124,7 +125,7 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), proxy, scheduler, clientOptions, final SslPeerDetails peerDetails = Proton.sslPeerDetails(HOSTNAME, ConnectionHandler.AMQPS_PORT); connectionHandler = new ConnectionHandler(CONNECTION_ID, connectionOptions, - peerDetails); + peerDetails, AmqpMetricsProvider.noop()); when(reactor.selectable()).thenReturn(selectable); when(reactor.connectionToHost(connectionHandler.getHostname(), connectionHandler.getProtocolPort(), @@ -140,7 +141,7 @@ AmqpTransportType.AMQP, new AmqpRetryOptions(), proxy, scheduler, clientOptions, .thenReturn(reactor); final SessionHandler sessionHandler = new SessionHandler(CONNECTION_ID, HOSTNAME, "EVENT_HUB", - reactorDispatcher, Duration.ofSeconds(20)); + reactorDispatcher, Duration.ofSeconds(20), AmqpMetricsProvider.noop()); when(handlerProvider.createConnectionHandler(CONNECTION_ID, connectionOptions)) .thenReturn(connectionHandler); @@ -178,10 +179,10 @@ public void getsManagementChannel() { when(receiver.attachments()).thenReturn(linkRecord); when(handlerProvider.createReceiveLinkHandler(eq(CONNECTION_ID), eq(HOSTNAME), anyString(), anyString())) - .thenReturn(new ReceiveLinkHandler(CONNECTION_ID, HOSTNAME, "receiver-name", "test-entity-path")); + .thenReturn(new ReceiveLinkHandler(CONNECTION_ID, HOSTNAME, "receiver-name", "test-entity-path", AmqpMetricsProvider.noop())); when(handlerProvider.createSendLinkHandler(eq(CONNECTION_ID), eq(HOSTNAME), anyString(), anyString())) - .thenReturn(new SendLinkHandler(CONNECTION_ID, HOSTNAME, "sender-name", "test-entity-path")); + .thenReturn(new SendLinkHandler(CONNECTION_ID, HOSTNAME, "sender-name", "test-entity-path", AmqpMetricsProvider.noop())); final EventHubReactorAmqpConnection connection = new EventHubReactorAmqpConnection(CONNECTION_ID, connectionOptions, "event-hub-name", reactorProvider, handlerProvider, tokenManagerProvider, From 12d778281b6e4ae9516ef6b6520cea1f820dc973 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Mon, 19 Sep 2022 10:31:33 -0700 Subject: [PATCH 2/5] cleanup --- .../opentelemetry/OpenTelemetryAttributes.java | 3 +++ .../OpenTelemetryAttributesTests.java | 8 +++++++- .../checkpointstore/blob/MetricsHelper.java | 16 ++++++++++------ .../blob/BlobCheckpointStoreMetricsTests.java | 14 +++++++------- .../eventhubs/EventHubClientBuilder.java | 1 + .../eventhubs/EventHubProducerAsyncClient.java | 15 ++++++--------- .../implementation/EventHubsMetricsProvider.java | 11 +++++++---- 7 files changed, 41 insertions(+), 27 deletions(-) diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java index 5c5d0a32280f7..f9b938a263ac3 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java @@ -21,6 +21,7 @@ class OpenTelemetryAttributes implements TelemetryAttributes { private static Map getMappings() { Map mappings = new HashMap<>(); // messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants + mappings.put("status", "otel.status_code"); mappings.put("entityName", "messaging.destination"); mappings.put("entityPath", "messaging.az.entity_path"); mappings.put("hostName", "net.peer.name"); @@ -28,6 +29,8 @@ private static Map getMappings() { mappings.put("amqpStatusCode", "amqp.status_code"); mappings.put("amqpOperation", "amqp.operation"); mappings.put("deliveryState", "amqp.delivery_state"); + mappings.put("partitionId", "messaging.az.partition_id"); + mappings.put("consumerGroup", "messaging.eventhubs.consumer_group"); return Collections.unmodifiableMap(mappings); } diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java index 35bccbca890d8..63870a09b1b71 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java @@ -69,12 +69,15 @@ public void attributeMappings() { put("deliveryState", "rejected"); put("amqpStatusCode", "no_content"); put("amqpOperation", "peek"); + put("partitionId", 42); + put("status", "error"); + put("consumerGroup", "$Default"); }}); assertEquals(OpenTelemetryAttributes.class, attributeCollection.getClass()); Attributes attributes = ((OpenTelemetryAttributes) attributeCollection).get(); - assertEquals(8, attributes.size()); + assertEquals(11, attributes.size()); assertEquals("value", attributes.get(AttributeKey.stringKey("foobar"))); assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name"))); assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination"))); @@ -83,6 +86,9 @@ public void attributeMappings() { assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state"))); assertEquals("peek", attributes.get(AttributeKey.stringKey("amqp.operation"))); assertEquals("no_content", attributes.get(AttributeKey.stringKey("amqp.status_code"))); + assertEquals(42, attributes.get(AttributeKey.longKey("messaging.az.partition_id"))); + assertEquals("error", attributes.get(AttributeKey.stringKey("otel.status_code"))); + assertEquals("$Default", attributes.get(AttributeKey.stringKey("messaging.eventhubs.consumer_group"))); } @Test diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java index 7f69e7631e3f0..cb1a4718f40b6 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java @@ -21,10 +21,14 @@ class MetricsHelper { private static final ClientLogger LOGGER = new ClientLogger(MetricsHelper.class); - public static final String ENTITY_NAME_KEY = "entityName"; - public static final String HOSTNAME_KEY = "hostName"; - public static final String PARTITION_ID_KEY = "partitionId"; - public static final String CONSUMER_GROUP_KEY = "consumerGroup"; + + // Make sure attribute names are consistent across AMQP Core, EventHubs, ServiceBus when applicable + // and mapped correctly in OTel Metrics https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java + private static final String ENTITY_NAME_KEY = "entityName"; + private static final String HOSTNAME_KEY = "hostName"; + private static final String PARTITION_ID_KEY = "partitionId"; + private static final String CONSUMER_GROUP_KEY = "consumerGroup"; + private static final String STATUS_KEY = "status"; // since checkpoint store is stateless it might be used for endless number of eventhubs. // we'll have as many subscriptions as there are combinations of fqdn, eventhub name, partitionId and consumer group. @@ -67,7 +71,7 @@ class MetricsHelper { this.isEnabled = meter != null && meter.isEnabled(); if (isEnabled) { this.lastSequenceNumber = this.meter.createLongGauge("messaging.eventhubs.checkpoint.sequence_number", "Last successfully checkpointed sequence number.", "seqNo"); - this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoint", "Number of checkpoints.", null); + this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoints", "Number of checkpoints.", null); } else { this.lastSequenceNumber = null; this.checkpointCounter = null; @@ -124,7 +128,7 @@ private Map createAttributes(Checkpoint checkpoint, String statu attributesMap.put(PARTITION_ID_KEY, checkpoint.getPartitionId()); attributesMap.put(CONSUMER_GROUP_KEY, checkpoint.getConsumerGroup()); if (status != null) { - attributesMap.put("status", status); + attributesMap.put(STATUS_KEY, status); } return attributesMap; diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java index 1bd402842ff56..4470f951ad0a3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java @@ -170,8 +170,8 @@ public void testUpdateEnabledMetrics() { assertEquals(2L, seqNoMeasurement.getValue()); assertCommonAttributes(checkpoint, seqNoMeasurement.getAttributes()); - assertTrue(meter.getCounters().containsKey("messaging.eventhubs.checkpoint")); - TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + assertTrue(meter.getCounters().containsKey("messaging.eventhubs.checkpoints")); + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); assertEquals(1, checkpoints.getMeasurements().size()); TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); assertEquals(1, checkpointMeasurements.getValue()); @@ -202,7 +202,7 @@ public void testUpdateEnabledMetricsFailure() { // sequence number is only reported for successfull checkpoints assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); - TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); assertEquals(1, checkpointMeasurements.getValue()); assertStatusAttributes(checkpoint, "error", checkpointMeasurements.getAttributes()); @@ -225,7 +225,7 @@ public void testUpdateEnabledMetricsNullSeqNo() { assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); - TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); TestMeasurement checkpointMeasurements = checkpoints.getMeasurements().get(0); assertEquals(1, checkpointMeasurements.getValue()); assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes()); @@ -265,7 +265,7 @@ public void testUpdateEnabledMetricsTooManyAttributes() { i[0]++; }); - TestCounter checkpointCounter = meter.getCounters().get("messaging.eventhubs.checkpoint"); + TestCounter checkpointCounter = meter.getCounters().get("messaging.eventhubs.checkpoints"); assertEquals(MAX_ATTRIBUTES_SETS, checkpointCounter.getMeasurements().size()); final int[] j = {0}; @@ -311,7 +311,7 @@ public void testUpdateEnabledMetricsMultipleMeasurements() { TestMeasurement seqNoMeasurement = subs.getMeasurements().get(0); assertEquals(42L, seqNoMeasurement.getValue()); - TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); assertEquals(2, checkpoints.getMeasurements().size()); assertEquals(1, checkpoints.getMeasurements().get(0).getValue()); @@ -363,7 +363,7 @@ public void testUpdateEnabledMetricsMultipleHubs() { assertEquals(42L, seqNoMeasurement2.getValue()); assertCommonAttributes(checkpoint2, seqNoMeasurement2.getAttributes()); - TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoint"); + TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); assertEquals(2, checkpoints.getMeasurements().size()); assertEquals(1, checkpoints.getMeasurements().get(0).getValue()); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 94c03cb5a70a3..cc9fca6d6c3c0 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -209,6 +209,7 @@ public class EventHubClientBuilder implements LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN); LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN); } + /** * Keeps track of the open clients that were created from this builder when there is a shared connection. */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index f332af33aef62..096138d537f45 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -322,9 +322,7 @@ public Mono createBatch(CreateBatchOptions options) { MAX_PARTITION_KEY_LENGTH))); } - final String entityPath = getEntityPath(partitionId); - - return getSendLink(entityPath) + return getSendLink(partitionId) .flatMap(link -> link.getLinkSize() .flatMap(size -> { final int maximumLinkSize = size > 0 @@ -581,8 +579,7 @@ public Mono send(EventDataBatch batch) { parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, sharedContext, ProcessKind.SEND)); } - final String entityPath = getEntityPath(batch.getPartitionId()); - final Mono sendMessage = getSendLink(entityPath) + final Mono sendMessage = getSendLink(batch.getPartitionId()) .flatMap(link -> messages.size() == 1 ? link.send(messages.get(0)) : link.send(messages)); @@ -592,10 +589,10 @@ public Mono send(EventDataBatch batch) { .publishOn(scheduler) .doOnEach(signal -> { Context context = isTracingEnabled ? parentContext.get() : Context.NONE; + metricsProvider.reportBatchSend(batch, batch.getPartitionId(), signal.getThrowable(), context); if (isTracingEnabled) { tracerProvider.endSpan(context, signal); } - metricsProvider.reportBatchSend(batch, batch.getPartitionId(), signal.getThrowable(), context); }); } @@ -611,8 +608,7 @@ private Mono sendInternal(Flux events, SendOptions options) { partitionKey, partitionId))); } - final String entityPath = getEntityPath(options.getPartitionId()); - return getSendLink(entityPath) + return getSendLink(options.getPartitionId()) .flatMap(link -> link.getLinkSize() .flatMap(size -> { final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; @@ -641,7 +637,8 @@ private String getEntityPath(String partitionId) { : String.format(Locale.US, SENDER_ENTITY_PATH_FORMAT, eventHubName, partitionId); } - private Mono getSendLink(String entityPath) { + private Mono getSendLink(String partitionId) { + final String entityPath = getEntityPath(partitionId); final String linkName = entityPath; return connectionProcessor diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java index d87cbe160dc12..d6c64cdda376b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java @@ -17,9 +17,12 @@ import java.util.concurrent.ConcurrentHashMap; import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_NAME_KEY; +import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONSUMER_GROUP_KEY; public class EventHubsMetricsProvider { private static final String PARTITION_ID_KEY = "partitionId"; + private static final String SEND_STATUS_KEY = "status"; private final Meter meter; private final boolean isEnabled; @@ -34,18 +37,18 @@ public EventHubsMetricsProvider(Meter meter, String namespace, String entityName this.isEnabled = meter != null && meter.isEnabled(); if (this.isEnabled) { Map commonAttributesMap = new HashMap<>(); - commonAttributesMap.put("hostName", namespace); + commonAttributesMap.put(HOSTNAME_KEY, namespace); commonAttributesMap.put(ENTITY_NAME_KEY, entityName); if (consumerGroup != null) { - commonAttributesMap.put("consumerGroup", consumerGroup); + commonAttributesMap.put(CONSUMER_GROUP_KEY, consumerGroup); } Map successMap = new HashMap<>(commonAttributesMap); - successMap.put("status", "ok"); + successMap.put(SEND_STATUS_KEY, "ok"); this.sendAttributeCacheSuccess = new AttributeCache(PARTITION_ID_KEY, successMap); Map failureMap = new HashMap<>(commonAttributesMap); - failureMap.put("status", "error"); + failureMap.put(SEND_STATUS_KEY, "error"); this.sendAttributeCacheFailure = new AttributeCache(PARTITION_ID_KEY, failureMap); this.receiveAttributeCache = new AttributeCache(PARTITION_ID_KEY, commonAttributesMap); From b884505bff54721c167017a9c18d562de0f3436e Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Tue, 20 Sep 2022 12:47:54 -0700 Subject: [PATCH 3/5] tests --- .../messaging/eventhubs/EventHubConsumerAsyncClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java index 32df913bad0be..2fcf3cdbb98ca 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java @@ -732,7 +732,7 @@ void receiveReportsMetrics() { List> measurements = consumerLag.getMeasurements(); TestMeasurement last = measurements.get(measurements.size() - 1); - assertEquals(Duration.between(enqueuedTime, afterReceived).toMillis(), last.getValue() * 1000, 10d); + assertEquals(Duration.between(enqueuedTime, afterReceived).toMillis() / 1000d, last.getValue(), 1); assertAttributes(EVENT_HUB_NAME, e.getPartitionContext().getPartitionId(), last.getAttributes()); return true; }) From ffff685d3eb2ec7d3a4e0c2a45474930a5450798 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Wed, 21 Sep 2022 14:26:50 -0700 Subject: [PATCH 4/5] review --- .../OpenTelemetryAttributes.java | 2 +- .../OpenTelemetryAttributesTests.java | 2 +- .../blob/BlobCheckpointStore.java | 7 +- .../checkpointstore/blob/MetricsHelper.java | 33 ++--- .../BlobEventProcessorClientStoreTest.java | 17 ++- ...ricsTests.java => MetricsHelperTests.java} | 122 ++++-------------- .../EventHubsMetricsProvider.java | 9 +- 7 files changed, 67 insertions(+), 125 deletions(-) rename sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/{BlobCheckpointStoreMetricsTests.java => MetricsHelperTests.java} (71%) diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java index f9b938a263ac3..969e62671f88c 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java @@ -29,7 +29,7 @@ private static Map getMappings() { mappings.put("amqpStatusCode", "amqp.status_code"); mappings.put("amqpOperation", "amqp.operation"); mappings.put("deliveryState", "amqp.delivery_state"); - mappings.put("partitionId", "messaging.az.partition_id"); + mappings.put("partitionId", "messaging.eventhubs.partition_id"); mappings.put("consumerGroup", "messaging.eventhubs.consumer_group"); return Collections.unmodifiableMap(mappings); diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java index 63870a09b1b71..87520a083bc64 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java @@ -86,7 +86,7 @@ public void attributeMappings() { assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state"))); assertEquals("peek", attributes.get(AttributeKey.stringKey("amqp.operation"))); assertEquals("no_content", attributes.get(AttributeKey.stringKey("amqp.status_code"))); - assertEquals(42, attributes.get(AttributeKey.longKey("messaging.az.partition_id"))); + assertEquals(42, attributes.get(AttributeKey.longKey("messaging.eventhubs.partition_id"))); assertEquals("error", attributes.get(AttributeKey.stringKey("otel.status_code"))); assertEquals("$Default", attributes.get(AttributeKey.stringKey("messaging.eventhubs.consumer_group"))); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java index 79429a78e7865..ea9d10685f26d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java @@ -7,6 +7,7 @@ import com.azure.core.util.ClientOptions; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.metrics.MeterProvider; import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.models.Checkpoint; @@ -90,7 +91,7 @@ public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient) { */ public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient, ClientOptions options) { this.blobContainerAsyncClient = blobContainerAsyncClient; - this.metricsHelper = new MetricsHelper(options == null ? null : options.getMetricsOptions()); + this.metricsHelper = new MetricsHelper(options == null ? null : options.getMetricsOptions(), MeterProvider.getDefaultProvider()); } /** @@ -273,7 +274,9 @@ public Mono updateCheckpoint(Checkpoint checkpoint) { } }) .doOnEach(signal -> { - metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError()); + if (signal.isOnComplete() || signal.isOnError()) { + metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError()); + } }); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java index cb1a4718f40b6..27d68b90e9769 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java @@ -19,7 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -class MetricsHelper { +final class MetricsHelper { private static final ClientLogger LOGGER = new ClientLogger(MetricsHelper.class); // Make sure attribute names are consistent across AMQP Core, EventHubs, ServiceBus when applicable @@ -41,8 +41,6 @@ class MetricsHelper { private static final String VERSION_KEY = "version"; private static final String LIBRARY_NAME; private static final String LIBRARY_VERSION; - private static MeterProvider meterProvider = MeterProvider.getDefaultProvider(); - private static final String UNKNOWN = "UNKNOWN"; static { @@ -62,13 +60,16 @@ class MetricsHelper { private final LongGauge lastSequenceNumber; private final LongCounter checkpointCounter; private final boolean isEnabled; - MetricsHelper(MetricsOptions metricsOptions) { - if (metricsOptions != null && !metricsOptions.isEnabled()) { - this.meter = null; - } else { + + MetricsHelper(MetricsOptions metricsOptions, MeterProvider meterProvider) { + if (areMetricsEnabled(metricsOptions)) { this.meter = meterProvider.createMeter(LIBRARY_NAME, LIBRARY_VERSION, metricsOptions); + this.isEnabled = this.meter.isEnabled(); + } else { + this.isEnabled = false; + this.meter = null; } - this.isEnabled = meter != null && meter.isEnabled(); + if (isEnabled) { this.lastSequenceNumber = this.meter.createLongGauge("messaging.eventhubs.checkpoint.sequence_number", "Last successfully checkpointed sequence number.", "seqNo"); this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoints", "Number of checkpoints.", null); @@ -78,14 +79,6 @@ class MetricsHelper { } } - /** - * Sets test meter provider. - */ - static synchronized void setMeterProvider(MeterProvider testMeterProvider) { - // TODO (lmolkova) add TestMeterProvider to azure-core-test - MetricsHelper.meterProvider = testMeterProvider; - } - void reportCheckpoint(Checkpoint checkpoint, String attributesId, boolean success) { if (!isEnabled || !(lastSequenceNumber.isEnabled() && checkpointCounter.isEnabled())) { return; @@ -160,6 +153,14 @@ private void updateCurrentValue(String attributesId, Checkpoint checkpoint) { } } + private static boolean areMetricsEnabled(MetricsOptions options) { + if (options == null || options.isEnabled()) { + return true; + } + + return false; + } + private static class CurrentValue { private final AtomicReference lastSeqNo; private final AutoCloseable subscription; diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java index 51b6d653ce17a..d511e855dc7b1 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java @@ -17,11 +17,13 @@ import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobItemProperties; import com.azure.storage.blob.models.ListBlobsOptions; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -56,9 +58,20 @@ public class BlobEventProcessorClientStoreTest { @Mock private BlobAsyncClient blobAsyncClient; + private AutoCloseable autoCloseable; + @BeforeEach - public void setup() { - MockitoAnnotations.openMocks(this); + public void beforeEach() { + this.autoCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + public void afterEach() throws Exception { + if (autoCloseable != null) { + autoCloseable.close(); + } + + Mockito.framework().clearInlineMock(this); } @Test diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelperTests.java similarity index 71% rename from sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java rename to sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelperTests.java index 4470f951ad0a3..8c4b80d5073b5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreMetricsTests.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelperTests.java @@ -3,35 +3,19 @@ package com.azure.messaging.eventhubs.checkpointstore.blob; -import com.azure.core.http.HttpHeaders; -import com.azure.core.http.rest.PagedFlux; -import com.azure.core.http.rest.PagedResponseBase; import com.azure.core.test.utils.metrics.TestCounter; import com.azure.core.test.utils.metrics.TestGauge; import com.azure.core.test.utils.metrics.TestMeasurement; import com.azure.core.test.utils.metrics.TestMeter; -import com.azure.core.util.ClientOptions; import com.azure.core.util.MetricsOptions; import com.azure.core.util.metrics.Meter; import com.azure.messaging.eventhubs.models.Checkpoint; -import com.azure.storage.blob.BlobAsyncClient; -import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.models.BlobItem; -import com.azure.storage.blob.models.ListBlobsOptions; -import com.azure.storage.blob.specialized.BlockBlobAsyncClient; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.api.parallel.Isolated; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import java.util.Arrays; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,7 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; @@ -52,36 +35,9 @@ @Execution(ExecutionMode.SAME_THREAD) @Isolated -public class BlobCheckpointStoreMetricsTests { - +public class MetricsHelperTests { private static final int MAX_ATTRIBUTES_SETS = 100; - @Mock - private BlobContainerAsyncClient blobContainerAsyncClient; - - @Mock - private BlockBlobAsyncClient blockBlobAsyncClient; - - @Mock - private BlobAsyncClient blobAsyncClient; - - @BeforeEach - public void setUp() { - MockitoAnnotations.openMocks(this); - BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0"); - PagedFlux response = new PagedFlux(() -> Mono.just(new PagedResponseBase(null, 200, null, - Arrays.asList(blobItem), null, - null))); - - when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/checkpoint/0")).thenReturn(blobAsyncClient); - when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response); - when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient); - when(blobAsyncClient.exists()).thenReturn(Mono.just(true)); - when(blobAsyncClient.setMetadata(ArgumentMatchers.>any())) - .thenReturn(Mono.empty()); - } - @Test public void testUpdateDisabledMetrics() { Checkpoint checkpoint = new Checkpoint() @@ -93,18 +49,17 @@ public void testUpdateDisabledMetrics() { .setOffset(100L); Meter meter = mock(Meter.class); + when(meter.isEnabled()).thenReturn(false); + TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> { assertEquals("azure-messaging-eventhubs-checkpointstore-blob", lib); assertNotNull(ver); - assertNull(opts); return meter; }); - when(meter.isEnabled()).thenReturn(false); - MetricsHelper.setMeterProvider(testProvider); - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); - StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), testProvider); + helper.reportCheckpoint(checkpoint, "ns/eh/ch/0", true); verify(meter, atLeastOnce()).isEnabled(); verify(meter, never()).createAttributes(anyMap()); @@ -124,11 +79,8 @@ public void testUpdateDisabledMetricsViaOptions() { Meter meter = mock(Meter.class); TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> meter); - MetricsHelper.setMeterProvider(testProvider); - - ClientOptions disabledMetrics = new ClientOptions().setMetricsOptions(new MetricsOptions().setEnabled(false)); - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient, disabledMetrics); - StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions().setEnabled(false), testProvider); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true); verify(meter, never()).createAttributes(anyMap()); verify(meter, never()).createLongGauge(any(), any(), any()); @@ -149,14 +101,11 @@ public void testUpdateEnabledMetrics() { TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> { assertEquals("azure-messaging-eventhubs-checkpointstore-blob", lib); assertNotNull(ver); - assertNull(opts); return meter; }); - MetricsHelper.setMeterProvider(testProvider); - - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); - StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), testProvider); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true); assertTrue(meter.getGauges().containsKey("messaging.eventhubs.checkpoint.sequence_number")); TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); @@ -190,16 +139,10 @@ public void testUpdateEnabledMetricsFailure() { .setOffset(100L); TestMeter meter = new TestMeter(); - MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); - - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); - when(blobAsyncClient.exists()).thenReturn(Mono.defer(() -> Mono.error(new RuntimeException("foo")))); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", false); - StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)) - .expectErrorMatches(e -> e instanceof RuntimeException) - .verify(); - - // sequence number is only reported for successfull checkpoints + // sequence number is only reported for successful checkpoints assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints"); @@ -218,10 +161,8 @@ public void testUpdateEnabledMetricsNullSeqNo() { .setOffset(100L); TestMeter meter = new TestMeter(); - MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); - - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); - StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true); assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size()); @@ -234,11 +175,6 @@ public void testUpdateEnabledMetricsNullSeqNo() { @Test public void testUpdateEnabledMetricsTooManyAttributes() { TestMeter meter = new TestMeter(); - MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); - - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); - when(blobContainerAsyncClient.getBlobAsyncClient(any())).thenReturn(blobAsyncClient); - List checkpoints = IntStream.range(0, MAX_ATTRIBUTES_SETS + 10) .mapToObj(n -> new Checkpoint() .setFullyQualifiedNamespace("ns") @@ -248,9 +184,9 @@ public void testUpdateEnabledMetricsTooManyAttributes() { .setSequenceNumber((long) n) .setOffset(100L)) .collect(Collectors.toList()); - StepVerifier.create(Flux.fromIterable(checkpoints) - .flatMap(c -> blobCheckpointStore.updateCheckpoint(c))) - .verifyComplete(); + + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + checkpoints.forEach(ch -> helper.reportCheckpoint(ch, "ns/eh/cg/" + ch.getPartitionId(), true)); List subscriptions = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions(); assertEquals(MAX_ATTRIBUTES_SETS, subscriptions.size()); @@ -295,14 +231,9 @@ public void testUpdateEnabledMetricsMultipleMeasurements() { .setOffset(100L); TestMeter meter = new TestMeter(); - TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> meter); - - MetricsHelper.setMeterProvider(testProvider); - - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); - StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint1) - .then(blobCheckpointStore.updateCheckpoint(checkpoint2))) - .verifyComplete(); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); + helper.reportCheckpoint(checkpoint1, "ns/eh/cg/0", true); + helper.reportCheckpoint(checkpoint2, "ns/eh/cg/0", true); TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); TestGauge.Subscription subs = seqNo.getSubscriptions().get(0); @@ -336,17 +267,12 @@ public void testUpdateEnabledMetricsMultipleHubs() { .setPartitionId("0") .setSequenceNumber(42L) .setOffset(100L); - when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh1/cg/checkpoint/0")).thenReturn(blobAsyncClient); - when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh2/cg/checkpoint/0")).thenReturn(blobAsyncClient); - TestMeter meter = new TestMeter(); - MetricsHelper.setMeterProvider(new TestMeterProvider((lib, ver, opts) -> meter)); + MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter)); - BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient, null); - StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint1) - .then(blobCheckpointStore.updateCheckpoint(checkpoint2))) - .verifyComplete(); + helper.reportCheckpoint(checkpoint1, "ns/eh1/cg/0", true); + helper.reportCheckpoint(checkpoint2, "ns/eh2/cg/0", true); TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number"); assertEquals(2, seqNo.getSubscriptions().size()); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java index d6c64cdda376b..d015b7e682041 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java @@ -19,10 +19,10 @@ import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_NAME_KEY; import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY; import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONSUMER_GROUP_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; public class EventHubsMetricsProvider { - private static final String PARTITION_ID_KEY = "partitionId"; - private static final String SEND_STATUS_KEY = "status"; + private static final String GENERIC_STATUS_KEY = "status"; private final Meter meter; private final boolean isEnabled; @@ -44,11 +44,11 @@ public EventHubsMetricsProvider(Meter meter, String namespace, String entityName } Map successMap = new HashMap<>(commonAttributesMap); - successMap.put(SEND_STATUS_KEY, "ok"); + successMap.put(GENERIC_STATUS_KEY, "ok"); this.sendAttributeCacheSuccess = new AttributeCache(PARTITION_ID_KEY, successMap); Map failureMap = new HashMap<>(commonAttributesMap); - failureMap.put(SEND_STATUS_KEY, "error"); + failureMap.put(GENERIC_STATUS_KEY, "error"); this.sendAttributeCacheFailure = new AttributeCache(PARTITION_ID_KEY, failureMap); this.receiveAttributeCache = new AttributeCache(PARTITION_ID_KEY, commonAttributesMap); @@ -59,7 +59,6 @@ public EventHubsMetricsProvider(Meter meter, String namespace, String entityName public void reportBatchSend(EventDataBatch batch, String partitionId, Throwable throwable, Context context) { if (isEnabled && sentEventsCounter.isEnabled()) { - // entity name is already reported AttributeCache cache = throwable == null ? sendAttributeCacheSuccess : sendAttributeCacheFailure; sentEventsCounter.add(batch.getCount(), cache.getOrCreate(partitionId), context); } From 644df6f33e679d628e456a52fd1c5ec156593297 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Wed, 28 Sep 2022 10:28:45 -0700 Subject: [PATCH 5/5] Review --- .../checkpointstore/blob/MetricsHelper.java | 14 +++++++------- .../implementation/EventHubsMetricsProvider.java | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java index 27d68b90e9769..c0adb39bdacdd 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java @@ -66,8 +66,8 @@ final class MetricsHelper { this.meter = meterProvider.createMeter(LIBRARY_NAME, LIBRARY_VERSION, metricsOptions); this.isEnabled = this.meter.isEnabled(); } else { - this.isEnabled = false; this.meter = null; + this.isEnabled = false; } if (isEnabled) { @@ -115,7 +115,7 @@ private TelemetryAttributes getOrCreate(ConcurrentHashMap createAttributes(Checkpoint checkpoint, String status) { - Map attributesMap = new HashMap<>(); + Map attributesMap = new HashMap<>(5); attributesMap.put(HOSTNAME_KEY, checkpoint.getFullyQualifiedNamespace()); attributesMap.put(ENTITY_NAME_KEY, checkpoint.getEventHubName()); attributesMap.put(PARTITION_ID_KEY, checkpoint.getPartitionId()); @@ -132,11 +132,13 @@ private void updateCurrentValue(String attributesId, Checkpoint checkpoint) { return; } - CurrentValue valueSupplier; + final CurrentValue valueSupplier; if (maxCapacityReached) { valueSupplier = seqNoSubscriptions.get(attributesId); + if (valueSupplier == null) { + return; + } } else { - TelemetryAttributes attributes = getOrCreate(common, attributesId, checkpoint, null); if (attributes == null) { return; @@ -148,9 +150,7 @@ private void updateCurrentValue(String attributesId, Checkpoint checkpoint) { }); } - if (valueSupplier != null) { - valueSupplier.set(checkpoint.getSequenceNumber()); - } + valueSupplier.set(checkpoint.getSequenceNumber()); } private static boolean areMetricsEnabled(MetricsOptions options) { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java index d015b7e682041..54638e5aaecec 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubsMetricsProvider.java @@ -36,7 +36,7 @@ public EventHubsMetricsProvider(Meter meter, String namespace, String entityName this.meter = meter; this.isEnabled = meter != null && meter.isEnabled(); if (this.isEnabled) { - Map commonAttributesMap = new HashMap<>(); + Map commonAttributesMap = new HashMap<>(3); commonAttributesMap.put(HOSTNAME_KEY, namespace); commonAttributesMap.put(ENTITY_NAME_KEY, entityName); if (consumerGroup != null) {