Skip to content

Commit

Permalink
generic operation duration and nits
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Sep 11, 2024
1 parent 22b59ac commit 3d646b9
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentedMessageFlux;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
Expand Down Expand Up @@ -308,7 +309,7 @@ boolean isV2() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<EventHubProperties> getEventHubProperties() {
return instrumentation.getTracer().traceMono(connectionProcessor.getManagementNodeWithRetries()
return instrumentation.instrumentMono(connectionProcessor.getManagementNodeWithRetries()
.flatMap(EventHubManagementNode::getEventHubProperties),
GET_EVENT_HUB_PROPERTIES, null);
}
Expand Down Expand Up @@ -341,7 +342,7 @@ public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
return monoError(LOGGER, new IllegalArgumentException("'partitionId' cannot be an empty string."));
}

return instrumentation.getTracer().traceMono(
return instrumentation.instrumentMono(
connectionProcessor.getManagementNodeWithRetries().flatMap(node -> node.getPartitionProperties(partitionId)),
GET_PARTITION_PROPERTIES, partitionId);
}
Expand Down Expand Up @@ -589,8 +590,8 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName,

final MessageFluxWrapper linkMessageProcessor;
if (connectionProcessor.isV2()) {
final MessageFlux messageFlux = new MessageFlux(receiveLinkFlux, prefetchCount, CreditFlowMode.EmissionDriven, MessageFlux.NULL_RETRY_POLICY);
linkMessageProcessor = new MessageFluxWrapper(messageFlux);
MessageFlux messageFlux = new MessageFlux(receiveLinkFlux, prefetchCount, CreditFlowMode.EmissionDriven, MessageFlux.NULL_RETRY_POLICY);
linkMessageProcessor = new MessageFluxWrapper(InstrumentedMessageFlux.instrument(messageFlux, partitionId, instrumentation));
} else {
final AmqpReceiveLinkProcessor receiveLinkProcessor = receiveLinkFlux.subscribeWith(
new AmqpReceiveLinkProcessor(entityPath, prefetchCount, partitionId, connectionProcessor, instrumentation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, i
}

if (consumer.isV2()) {
// TODO(limolkova) instrument
// Sync receiver instrumentation is implemented in the SynchronousReceiver class
return syncReceiver.receive(partitionId, startingPosition, defaultReceiveOptions, maximumMessageCount,
maximumWaitTime);
}
Expand Down Expand Up @@ -315,7 +315,7 @@ public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, i
}

if (consumer.isV2()) {
// TODO (instrument)
// Sync receiver instrumentation is implemented in the SynchronousReceiver class
return syncReceiver.receive(partitionId, startingPosition, receiveOptions, maximumMessageCount,
maximumWaitTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public String getEventHubName() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<EventHubProperties> getEventHubProperties() {
return instrumentation.getTracer().traceMono(
return instrumentation.instrumentMono(
connectionProcessor.getManagementNodeWithRetries().flatMap(EventHubManagementNode::getEventHubProperties),
GET_EVENT_HUB_PROPERTIES, null);
}
Expand All @@ -324,7 +324,7 @@ public Flux<String> getPartitionIds() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
return instrumentation.getTracer().traceMono(
return instrumentation.instrumentMono(
connectionProcessor.getManagementNodeWithRetries().flatMap(node -> node.getPartitionProperties(partitionId)),
GET_PARTITION_PROPERTIES, partitionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import com.azure.core.util.Context;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.SpanKind;
import com.azure.core.util.tracing.StartSpanOptions;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsMetricsProvider;
Expand All @@ -17,8 +16,11 @@

import java.util.function.BiConsumer;

import static com.azure.core.util.tracing.SpanKind.CLIENT;
import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_BATCH_MESSAGE_COUNT;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_DESTINATION_PARTITION_ID;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.SEND;

class EventHubsProducerInstrumentation {
private final EventHubsTracer tracer;
Expand All @@ -39,7 +41,7 @@ <T> Mono<T> sendBatch(Mono<T> publisher, EventDataBatch batch) {

return Mono.using(
() -> new InstrumentationScope(tracer, meter, reportMetricsCallback)
.setSpan(startPublishSpanWithLinks(batch, Context.NONE)),
.setSpan(startPublishSpanWithLinks(batch)),
scope -> publisher
.doOnError(scope::setError)
.doOnCancel(scope::setCancelled),
Expand All @@ -50,12 +52,27 @@ public EventHubsTracer getTracer() {
return tracer;
}

private Context startPublishSpanWithLinks(EventDataBatch batch, Context context) {
public <T> Mono<T> instrumentMono(Mono<T> publisher, OperationName operationName, String partitionId) {
if (!isEnabled()) {
return publisher;
}

return Mono.using(
() -> new InstrumentationScope(tracer, meter, (m, s) -> m.reportGenericOperationDuration(operationName, partitionId, s))
.setSpan(tracer.startGenericOperationSpan(operationName, partitionId, Context.NONE)),
scope -> publisher
.doOnError(scope::setError)
.doOnCancel(scope::setCancelled)
.contextWrite(c -> c.put(PARENT_TRACE_CONTEXT_KEY, scope.getSpan())),
InstrumentationScope::close);
}

private Context startPublishSpanWithLinks(EventDataBatch batch) {
if (!tracer.isEnabled()) {
return context;
return Context.NONE;
}

StartSpanOptions startOptions = tracer.createStartOptions(SpanKind.CLIENT, OperationName.SEND, null);
StartSpanOptions startOptions = tracer.createStartOptions(CLIENT, SEND, null);
if (batch != null) {
startOptions.setAttribute(MESSAGING_BATCH_MESSAGE_COUNT, batch.getCount());
if (batch.getPartitionId() != null) {
Expand All @@ -66,7 +83,7 @@ private Context startPublishSpanWithLinks(EventDataBatch batch, Context context)
}
}

return tracer.startSpan(OperationName.SEND, startOptions, context);
return tracer.startSpan(SEND, startOptions, Context.NONE);
}

private boolean isEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
Expand All @@ -12,10 +11,10 @@

final class MessageFluxWrapper {
private final AmqpReceiveLinkProcessor receiveLinkProcessor;
private final MessageFlux messageFlux;
private final Flux<Message> messageFlux;
private final boolean isV2;

MessageFluxWrapper(MessageFlux messageFlux) {
MessageFluxWrapper(Flux<Message> messageFlux) {
this.messageFlux = Objects.requireNonNull(messageFlux, "'messageFlux' cannot be null.");
this.receiveLinkProcessor = null;
this.isV2 = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ private WindowedSubscriber<PartitionEvent> createSubscriber(String partitionId)
final WindowedSubscriberOptions<PartitionEvent> options = new WindowedSubscriberOptions<>();
options.setWindowDecorator(toDecorate -> {
// Decorates the provided 'toDecorate' flux for tracing the signals (events, termination) it produces.
return toDecorate;
//return instrumentation.reportSyncReceiveSpan(SYNC_RECEIVE_SPAN_NAME, startTime, toDecorate, Context.NONE);
return instrumentation.syncReceive(toDecorate, partitionId);
});
return new WindowedSubscriber<>(Collections.singletonMap(PARTITION_ID_KEY, partitionId), TERMINAL_MESSAGE, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.function.BiConsumer;

import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
import static com.azure.core.util.tracing.SpanKind.CONSUMER;
import static com.azure.core.util.tracing.SpanKind.CLIENT;
import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_BATCH_MESSAGE_COUNT;
import static com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationUtils.MESSAGING_DESTINATION_PARTITION_ID;
import static com.azure.messaging.eventhubs.implementation.instrumentation.OperationName.RECEIVE;
Expand All @@ -39,6 +41,7 @@ public EventHubsConsumerInstrumentation(Tracer tracer, Meter meter, String fully
this.isSync = isSyncConsumer;
}


public EventHubsTracer getTracer() {
return tracer;
}
Expand All @@ -52,7 +55,7 @@ public InstrumentationScope startAsyncConsume(Message message, String partitionI
return NOOP_SCOPE;
}

InstrumentationScope scope = createScope((m, s) -> {
InstrumentationScope scope = createScope((m, s) -> {
if (!isSync) {
m.reportProcess(1, partitionId, s);
}
Expand All @@ -62,8 +65,8 @@ public InstrumentationScope startAsyncConsume(Message message, String partitionI
ApplicationProperties properties = message.getApplicationProperties();
scope.setSpan(tracer.startProcessSpan(properties == null ? null : properties.getValue(),
enqueuedTime,
partitionId,
Context.NONE))
partitionId
))
.makeSpanCurrent();
}

Expand All @@ -78,7 +81,7 @@ public Flux<PartitionEvent> syncReceive(Flux<PartitionEvent> events, String part
return events;
}

StartSpanOptions startOptions = tracer.isEnabled() ? tracer.createStartOptions(CONSUMER, RECEIVE, partitionId) : null;
StartSpanOptions startOptions = tracer.isEnabled() ? tracer.createStartOptions(CLIENT, RECEIVE, partitionId) : null;
Integer[] receivedCount = new Integer[]{0};

return Flux.using(
Expand Down Expand Up @@ -119,7 +122,7 @@ public InstrumentationScope startProcess(EventBatchContext batchContext) {
m.reportProcess(batchContext.getEvents().size(), batchContext.getPartitionContext().getPartitionId(), s));

return scope
.setSpan(tracer.startProcessSpan(batchContext, Context.NONE))
.setSpan(tracer.startProcessSpan(batchContext))
.makeSpanCurrent();
}

Expand All @@ -134,15 +137,30 @@ public InstrumentationScope startProcess(EventContext eventContext) {

Context span = tracer.startProcessSpan(event.getProperties(),
event.getEnqueuedTime(),
eventContext.getPartitionContext().getPartitionId(),
Context.NONE);
eventContext.getPartitionContext().getPartitionId()
);

return scope
.setSpan(span)
.makeSpanCurrent();
}

boolean isEnabled() {
public <T> Mono<T> instrumentMono(Mono<T> publisher, OperationName operationName, String partitionId) {
if (!isEnabled()) {
return publisher;
}

return Mono.using(
() -> createScope((m, s) -> m.reportGenericOperationDuration(operationName, partitionId, s))
.setSpan(tracer.startGenericOperationSpan(operationName, partitionId, Context.NONE)),
scope -> publisher
.doOnError(scope::setError)
.doOnCancel(scope::setCancelled)
.contextWrite(c -> c.put(PARENT_TRACE_CONTEXT_KEY, scope.getSpan())),
InstrumentationScope::close);
}

public boolean isEnabled() {
return tracer.isEnabled() || meter.isEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class EventHubsMetricsProvider {
private AttributeCache receiveAttributeCacheSuccess;
private AttributeCache checkpointAttributeCacheSuccess;
private AttributeCache processAttributeCacheSuccess;
private AttributeCache getPartitionPropertiesAttributeCacheSuccess;
private AttributeCache getEventHubPropertiesAttributeCacheSuccess;
private AttributeCache lagAttributeCache;
private LongCounter publishedEventCounter;
private LongCounter consumedEventCounter;
Expand All @@ -61,6 +63,8 @@ public EventHubsMetricsProvider(Meter meter, String namespace, String entityName
this.receiveAttributeCacheSuccess = AttributeCache.create(meter, RECEIVE, commonAttributes);
this.checkpointAttributeCacheSuccess = AttributeCache.create(meter, CHECKPOINT, commonAttributes);
this.processAttributeCacheSuccess = AttributeCache.create(meter, PROCESS, commonAttributes);
this.getPartitionPropertiesAttributeCacheSuccess = AttributeCache.create(meter, OperationName.GET_PARTITION_PROPERTIES, commonAttributes);
this.getEventHubPropertiesAttributeCacheSuccess = AttributeCache.create(meter, OperationName.GET_EVENT_HUB_PROPERTIES, commonAttributes);
this.lagAttributeCache = new AttributeCache(meter, MESSAGING_DESTINATION_PARTITION_ID, commonAttributes);

this.publishedEventCounter = meter.createLongCounter(MESSAGING_CLIENT_PUBLISHED_MESSAGES, "The number of published events", "{event}");
Expand Down Expand Up @@ -120,6 +124,13 @@ public void reportCheckpoint(Checkpoint checkpoint, InstrumentationScope scope)
}
}

public void reportGenericOperationDuration(OperationName operationName, String partitionId, InstrumentationScope scope) {
if (isEnabled && operationDuration.isEnabled()) {
operationDuration.record(getDurationInSeconds(scope.getStartTime()),
getOrCreateAttributes(operationName, partitionId, scope.getErrorType()), scope.getSpan());
}
}

private TelemetryAttributes getOrCreateAttributes(OperationName operationName, String partitionId, String errorType) {
if (errorType == null) {
switch (operationName) {
Expand All @@ -131,6 +142,10 @@ private TelemetryAttributes getOrCreateAttributes(OperationName operationName, S
return checkpointAttributeCacheSuccess.getOrCreate(partitionId);
case PROCESS:
return processAttributeCacheSuccess.getOrCreate(partitionId);
case GET_PARTITION_PROPERTIES:
return getPartitionPropertiesAttributeCacheSuccess.getOrCreate(partitionId);
case GET_EVENT_HUB_PROPERTIES:
return getEventHubPropertiesAttributeCacheSuccess.getOrCreate(partitionId);
default:
LOGGER.atVerbose()
.addKeyValue("operationName", operationName)
Expand Down
Loading

0 comments on commit 3d646b9

Please sign in to comment.