Skip to content

Commit

Permalink
EventHubs tracing and metrics: catch up with OTel specification (#38899)
Browse files Browse the repository at this point in the history
* Update EH to the latest otel spec and rework metrics

* generic operation duration and nits

* fix tests and update

* test: remove v2 stack

* tryme

* tryme

* up

* up

* up

* up

* remove global otel

* up

* up

* up

* up

* up

* up

* lint and stress test fixes

* Rebase and review comments

* Rebase and review comments

* nits
  • Loading branch information
lmolkova authored Sep 19, 2024
1 parent 104453b commit f37c884
Show file tree
Hide file tree
Showing 48 changed files with 3,266 additions and 1,495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@
<suppress checks="IllegalImport" files="com.azure.messaging.servicebus.IntegrationTestBase.java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.servicebus.TracingIntegrationTests.java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.servicebus.LoggingSpanProcessor.java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.TracingIntegrationTests.java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.PublishEventsTracingWithCustomContextSample.java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.EventProcessorWithTracingClientSample.java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.*Sample.*java"/>
<suppress checks="IllegalImport" files="com.azure.messaging.eventhubs.*Test.*java"/>

<!-- AzureSpringMonitorConfig defines Spring bean used in https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/OpenTelemetryAutoConfiguration.java-->
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addErrorCondition;
import static com.azure.core.amqp.implementation.ClientConstants.SESSION_ID_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.ERROR_DESCRIPTION_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.SESSION_NAME_KEY;

/**
Expand Down Expand Up @@ -172,6 +173,11 @@ public void onSessionFinal(Event e) {
}

private void onSessionTimeout() {
logger.atWarning()
.addKeyValue(SESSION_NAME_KEY, sessionName)
.addKeyValue(ERROR_DESCRIPTION_KEY, "timeout")
.log("onSessionTimeout");

// It is supposed to close a local session to handle timeout exception.
// However, closing the session can result in NPE because of proton-j bug (https://issues.apache
// .org/jira/browse/PROTON-1939).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

### Breaking Changes

- Experimental checkpointing metrics are no longer reported by this package. They've been moved to `azure-messaging-eventhubs` package.
([#38899](https://github.com/Azure/azure-sdk-for-java/pull/38899))

### Bugs Fixed

### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.azure.core.util.ClientOptions;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.models.Checkpoint;
Expand All @@ -24,13 +23,11 @@
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -70,7 +67,6 @@ public class BlobCheckpointStore implements CheckpointStore {
private static final ClientLogger LOGGER = new ClientLogger(BlobCheckpointStore.class);

private final BlobContainerAsyncClient blobContainerAsyncClient;
private final MetricsHelper metricsHelper;
private final Map<String, BlobAsyncClient> blobClients = new ConcurrentHashMap<>();

/**
Expand All @@ -93,7 +89,6 @@ public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient) {
*/
public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient, ClientOptions options) {
this.blobContainerAsyncClient = blobContainerAsyncClient;
this.metricsHelper = new MetricsHelper(options == null ? null : options.getMetricsOptions(), MeterProvider.getDefaultProvider());
}

/**
Expand Down Expand Up @@ -267,30 +262,14 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
metadata.put(OFFSET, offset);
BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

Mono<Void> response = blobAsyncClient.exists().flatMap(exists -> {
return blobAsyncClient.exists().flatMap(exists -> {
if (exists) {
return blobAsyncClient.setMetadata(metadata);
} else {
return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null,
metadata, null, null, null).then();
}
});
return reportMetrics(response, checkpoint, blobName);
}

private Mono<Void> reportMetrics(Mono<Void> checkpointMono, Checkpoint checkpoint, String blobName) {
AtomicReference<Instant> startTime = metricsHelper.isCheckpointDurationEnabled() ? new AtomicReference<>() : null;
return checkpointMono
.doOnEach(signal -> {
if (signal.isOnComplete() || signal.isOnError()) {
metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError(), startTime != null ? startTime.get() : null);
}
})
.doOnSubscribe(ignored -> {
if (startTime != null) {
startTime.set(Instant.now());
}
});
}

private String getBlobPrefix(String fullyQualifiedNamespace, String eventHubName, String consumerGroupName,
Expand Down Expand Up @@ -348,5 +327,4 @@ private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem)

return Mono.empty();
}

}

This file was deleted.

Loading

0 comments on commit f37c884

Please sign in to comment.