diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 8a4da28f6bb7d..2415a881bc4fd 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixes trace context propagation issue: links to *message* spans were not populated on *send* span. ([#28951](https://github.com/Azure/azure-sdk-for-java/pull/28951)) + ### Other Changes ## 5.12.0 (2022-05-16) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java index 4067b57c9fe96..004b0383b4805 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java @@ -155,13 +155,17 @@ private EventData traceMessageSpan(EventData eventData) { .addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE) .addData(ENTITY_PATH_KEY, this.entityPath) .addData(HOST_NAME_KEY, this.hostname); - Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, eventContext, + eventContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, eventContext, ProcessKind.MESSAGE); - Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); + Optional eventDiagnosticIdOptional = eventContext.getData(DIAGNOSTIC_ID_KEY); if (eventDiagnosticIdOptional.isPresent()) { eventData.getProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get().toString()); - tracerProvider.endSpan(eventSpanContext, Signal.complete()); - eventData.addContext(SPAN_CONTEXT_KEY, eventSpanContext); + tracerProvider.endSpan(eventContext, Signal.complete()); + + Object spanContext = eventContext.getData(SPAN_CONTEXT_KEY).orElse(null); + if (spanContext != null) { + eventData.addContext(SPAN_CONTEXT_KEY, spanContext); + } } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index a6a723f5b96a9..c464c20926983 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -539,13 +539,21 @@ public Mono send(EventDataBatch batch) { for (int i = 0; i < batch.getEvents().size(); i++) { final EventData event = batch.getEvents().get(i); if (isTracingEnabled) { - parentContext.set(event.getContext()); if (i == 0) { - sharedContext = tracerProvider.getSharedSpanBuilder(ClientConstants.AZ_TRACING_SERVICE_NAME, - parentContext.get()); + sharedContext = event.getContext() + .addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE) + .addData(ENTITY_PATH_KEY, eventHubName) + .addData(HOST_NAME_KEY, fullyQualifiedNamespace); + + sharedContext = tracerProvider.getSharedSpanBuilder(ClientConstants.AZ_TRACING_SERVICE_NAME, sharedContext); + tracerProvider.addSpanLinks(sharedContext); + } else { + // TODO (lmolkova) we need better addSpanLinks - https://github.com/Azure/azure-sdk-for-java/issues/28953 + Object eventSpanContext = event.getContext().getData(SPAN_CONTEXT_KEY).orElse(Context.NONE); + tracerProvider.addSpanLinks(sharedContext.addData(SPAN_CONTEXT_KEY, eventSpanContext)); } - tracerProvider.addSpanLinks(sharedContext.addData(SPAN_CONTEXT_KEY, event.getContext())); } + final Message message = messageSerializer.serialize(event); if (!CoreUtils.isNullOrEmpty(partitionKey)) { @@ -559,14 +567,8 @@ public Mono send(EventDataBatch batch) { } if (isTracingEnabled) { - final Context finalSharedContext = sharedContext == null - ? Context.NONE - : sharedContext - .addData(ENTITY_PATH_KEY, eventHubName) - .addData(HOST_NAME_KEY, fullyQualifiedNamespace) - .addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE); // Start send span and store updated context - parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, finalSharedContext, ProcessKind.SEND)); + parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, sharedContext, ProcessKind.SEND)); } final Mono sendMessage = getSendLink(batch.getPartitionId()) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java index 5c149372ccfca..f0270e0740ea9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java @@ -58,22 +58,26 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; -import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY; +import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_BUILDER_KEY; +import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -318,60 +322,69 @@ void partitionProducerCannotSendWithPartitionKey() { */ @Test void sendStartSpanSingleMessage() { + final Flux testData = Flux.just( + new EventData(TEST_CONTENTS.getBytes(UTF_8))); + final SendOptions sendOptions = new SendOptions(); + // Arrange final Tracer tracer1 = mock(Tracer.class); final List tracers = Collections.singletonList(tracer1); TracerProvider tracerProvider = new TracerProvider(tracers); - final Flux testData = Flux.just( - new EventData(TEST_CONTENTS.getBytes(UTF_8)), - new EventData(TEST_CONTENTS.getBytes(UTF_8))); - final String partitionId = "my-partition-id"; - final SendOptions sendOptions = new SendOptions() - .setPartitionId(partitionId); final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); - when(connection.createSendLink( - argThat(name -> name.endsWith(partitionId)), argThat(name -> name.endsWith(partitionId)), eq(retryOptions))) + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any())) .thenReturn(Mono.just(sendLink)); - + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - return passed.addData(PARENT_SPAN_KEY, "value"); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value"); } ); - when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2"); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value") + .addData(DIAGNOSTIC_ID_KEY, "diag-id") + .addData(SPAN_CONTEXT_KEY, "span-context"); } ); - when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_BUILDER_KEY, "value"); + return passed.addData(SPAN_BUILDER_KEY, "span-builder"); } ); + doAnswer( + invocation -> { + Context passed = invocation.getArgument(0, Context.class); + assertEquals("span-builder", passed.getData(SPAN_BUILDER_KEY).orElseGet(null)); + assertEquals("span-context", passed.getData(SPAN_CONTEXT_KEY).orElseGet(null)); + return null; + }).when(tracer1).addLink(any()); + // Act StepVerifier.create(asyncProducer.send(testData, sendOptions)) .verifyComplete(); - // Assert + //Assert verify(tracer1, times(1)) .start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND)); - verify(tracer1, times(2)) + verify(tracer1, times(1)) .start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); - verify(tracer1, times(3)).end(eq("success"), isNull(), any()); + verify(tracer1, times(2)).end(eq("success"), isNull(), any()); + verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any()); + verify(tracer1, times(1)).addLink(any()); verifyNoInteractions(onClientClosed); } @@ -391,29 +404,25 @@ void sendMessageRetrySpanTest() { tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); final String failureKey = "fail"; - final EventData testData = new EventData("test"); + final EventData testData = new EventData("test") + .addContext(SPAN_CONTEXT_KEY, "span-context"); testData.getProperties().put(failureKey, "true"); when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - return passed.addData(PARENT_SPAN_KEY, "value").addData(HOST_NAME_KEY, "value2"); - } - ); - - when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer( - invocation -> { - Context passed = invocation.getArgument(1, Context.class); - assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2"); + assertEquals(AZ_NAMESPACE_VALUE, passed.getData(AZ_TRACING_NAMESPACE_KEY).get()); + assertEquals(HOSTNAME, passed.getData(HOST_NAME_KEY).get()); + assertEquals(EVENT_HUB_NAME, passed.getData(ENTITY_PATH_KEY).get()); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "trace-context"); } ); when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_BUILDER_KEY, "value"); + assertEquals("span-context", passed.getData("span-context").orElseGet(null)); + return passed.addData(SPAN_BUILDER_KEY, "span-builder"); } ); @@ -432,14 +441,15 @@ void sendMessageRetrySpanTest() { .thenReturn(Mono.empty()); producer.send(testData).block(); + assertFalse(testData.getProperties().containsKey(DIAGNOSTIC_ID_KEY)); //Assert - verify(tracer1, times(1)) - .start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND)); - verify(tracer1, times(1)) - .start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); + verify(tracer1, times(1)).start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND)); + verify(tracer1, never()).start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); verify(tracer1, times(1)).addLink(any()); - verify(tracer1, times(2)).end(eq("success"), isNull(), any()); + verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any()); + verify(tracer1, times(1)).end(eq("success"), isNull(), any()); + verifyNoMoreInteractions(onClientClosed); } @@ -529,37 +539,75 @@ void startMessageSpansOnCreateBatch() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); - final AmqpSendLink link = mock(AmqpSendLink.class); - - when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); - when(link.getHostname()).thenReturn(HOSTNAME); - when(link.getEntityPath()).thenReturn(ENTITY_PATH); // EC is the prefix they use when creating a link that sends to the service round-robin. when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions))) - .thenReturn(Mono.just(link)); + .thenReturn(Mono.just(sendLink)); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + final AtomicReference eventInd = new AtomicReference<>(0); when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - assertEquals(passed.getData(ENTITY_PATH_KEY).get(), ENTITY_PATH); - assertEquals(passed.getData(HOST_NAME_KEY).get(), HOSTNAME); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "span") + .addData(DIAGNOSTIC_ID_KEY, eventInd.get().toString()) + .addData(SPAN_CONTEXT_KEY, eventInd.get()); + } + ); + + when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertEquals(0, passed.getData(SPAN_CONTEXT_KEY).orElseGet(null)); + return passed.addData(SPAN_BUILDER_KEY, "span-builder"); + } + ); - return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2"); + final AtomicReference linkNumber = new AtomicReference<>(0); + doAnswer( + invocation -> { + Context passed = invocation.getArgument(0, Context.class); + assertEquals("span-builder", passed.getData(SPAN_BUILDER_KEY).orElseGet(null)); + assertEquals(linkNumber.get(), passed.getData(SPAN_CONTEXT_KEY).orElseGet(null)); + linkNumber.set(linkNumber.get() + 1); + return null; + }).when(tracer1).addLink(any()); + + when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertEquals(EVENT_HUB_NAME, passed.getData(ENTITY_PATH_KEY).orElseGet(null)); + assertEquals(HOSTNAME, passed.getData(HOST_NAME_KEY).orElseGet(null)); + assertEquals(AZ_NAMESPACE_VALUE, passed.getData(AZ_TRACING_NAMESPACE_KEY).orElseGet(null)); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "span"); } ); // Act & Assert - StepVerifier.create(asyncProducer.createBatch()) - .assertNext(batch -> { - Assertions.assertTrue(batch.tryAdd(new EventData("Hello World".getBytes(UTF_8)))); - }) + StepVerifier.create(asyncProducer.createBatch() + .flatMap(batch -> { + final EventData data0 = new EventData("Hello World".getBytes(UTF_8)); + Assertions.assertTrue(batch.tryAdd(data0)); + assertEquals("0", data0.getProperties().get(DIAGNOSTIC_ID_KEY)); + + eventInd.set(1); + final EventData data1 = new EventData("Hello World".getBytes(UTF_8)); + Assertions.assertTrue(batch.tryAdd(data1)); + assertEquals("1", data1.getProperties().get(DIAGNOSTIC_ID_KEY)); + return asyncProducer.send(batch); + })) .verifyComplete(); - verify(tracer1, times(1)) + verify(tracer1, times(2)) .start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); - verify(tracer1, times(1)).end(eq("success"), isNull(), any()); + verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any()); + verify(tracer1, times(2)).addLink(any()); + verify(tracer1, times(1)).start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND)); + verify(tracer1, times(2)).start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); + verify(tracer1, times(3)).end(eq("success"), isNull(), any()); verifyNoInteractions(onClientClosed); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java index c1796c3e7ec20..c5093705f9fdb 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java @@ -44,20 +44,25 @@ import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; -import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY; +import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; +import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; +import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_BUILDER_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -97,6 +102,8 @@ public void setup() { when(sendLink.getErrorContext()).thenReturn(new AmqpErrorContext("test-namespace")); when(sendLink.send(anyList())).thenReturn(Mono.empty()); when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); final TracerProvider tracerProvider = new TracerProvider(Collections.emptyList()); @@ -175,26 +182,37 @@ public void sendStartSpanSingleMessage() { invocation -> { Context passed = invocation.getArgument(1, Context.class); assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - return passed.addData(PARENT_SPAN_KEY, "value"); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value"); } ); when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2"); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value") + .addData(DIAGNOSTIC_ID_KEY, "diag-id") + .addData(SPAN_CONTEXT_KEY, "span-context"); } ); when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_BUILDER_KEY, "value"); + return passed.addData(SPAN_BUILDER_KEY, "span-builder"); } ); + doAnswer( + invocation -> { + Context passed = invocation.getArgument(0, Context.class); + assertEquals("span-builder", passed.getData(SPAN_BUILDER_KEY).orElseGet(null)); + assertEquals("span-context", passed.getData(SPAN_CONTEXT_KEY).orElseGet(null)); + return null; + }).when(tracer1).addLink(any()); + //Act try { producer.send(eventData); + assertEquals("diag-id", eventData.getProperties().get(DIAGNOSTIC_ID_KEY)); } finally { producer.close(); } @@ -205,6 +223,8 @@ public void sendStartSpanSingleMessage() { verify(tracer1, times(1)) .start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); verify(tracer1, times(2)).end(eq("success"), isNull(), any()); + verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any()); + verify(tracer1, times(1)).addLink(any()); verifyNoInteractions(onClientClosed); } @@ -227,26 +247,30 @@ public void sendMessageRetrySpanTest() { connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)) - .addContext(SPAN_CONTEXT_KEY, Context.NONE); + .addContext(SPAN_CONTEXT_KEY, "span-context"); when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE); - return passed.addData(PARENT_SPAN_KEY, "value"); + assertEquals(AZ_NAMESPACE_VALUE, passed.getData(AZ_TRACING_NAMESPACE_KEY).get()); + assertEquals(HOSTNAME, passed.getData(HOST_NAME_KEY).get()); + assertEquals(EVENT_HUB_NAME, passed.getData(ENTITY_PATH_KEY).get()); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "trace-context"); } ); when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_BUILDER_KEY, "value"); + assertEquals("span-context", passed.getData("span-context").orElseGet(null)); + return passed.addData(SPAN_BUILDER_KEY, "span-builder"); } ); //Act try { producer.send(eventData); + assertFalse(eventData.getProperties().containsKey(DIAGNOSTIC_ID_KEY)); } finally { producer.close(); } @@ -255,6 +279,7 @@ public void sendMessageRetrySpanTest() { verify(tracer1, times(1)).start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND)); verify(tracer1, never()).start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); verify(tracer1, times(1)).addLink(any()); + verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any()); verify(tracer1, times(1)).end(eq("success"), isNull(), any()); verifyNoInteractions(onClientClosed); @@ -385,18 +410,56 @@ public void startsMessageSpanOnEventBatch() { when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any())) .thenReturn(Mono.just(sendLink)); + final AtomicReference eventInd = new AtomicReference<>(0); when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer( invocation -> { Context passed = invocation.getArgument(1, Context.class); - return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2"); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "span") + .addData(DIAGNOSTIC_ID_KEY, eventInd.get().toString()) + .addData(SPAN_CONTEXT_KEY, eventInd.get()); + } + ); + + when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertEquals(0, passed.getData(SPAN_CONTEXT_KEY).orElseGet(null)); + return passed.addData(SPAN_BUILDER_KEY, "span-builder"); + } + ); + + final AtomicReference linkNumber = new AtomicReference<>(0); + doAnswer( + invocation -> { + Context passed = invocation.getArgument(0, Context.class); + assertEquals("span-builder", passed.getData(SPAN_BUILDER_KEY).orElseGet(null)); + assertEquals(linkNumber.get(), passed.getData(SPAN_CONTEXT_KEY).orElseGet(null)); + linkNumber.set(linkNumber.get() + 1); + return null; + }).when(tracer1).addLink(any()); + + when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertEquals(EVENT_HUB_NAME, passed.getData(ENTITY_PATH_KEY).orElseGet(null)); + assertEquals(HOSTNAME, passed.getData(HOST_NAME_KEY).orElseGet(null)); + assertEquals(AZ_NAMESPACE_VALUE, passed.getData(AZ_TRACING_NAMESPACE_KEY).orElseGet(null)); + return passed.addData(PARENT_TRACE_CONTEXT_KEY, "span"); } ); // Act & Assert try { final EventDataBatch batch = producer.createBatch(); - Assertions.assertTrue(batch.tryAdd(new EventData("Hello World".getBytes(UTF_8)))); - Assertions.assertTrue(batch.tryAdd(new EventData("Test World".getBytes(UTF_8)))); + final EventData data0 = new EventData("Hello World".getBytes(UTF_8)); + Assertions.assertTrue(batch.tryAdd(data0)); + assertEquals("0", data0.getProperties().get(DIAGNOSTIC_ID_KEY)); + + eventInd.set(1); + final EventData data1 = new EventData("Hello World".getBytes(UTF_8)); + Assertions.assertTrue(batch.tryAdd(data1)); + assertEquals("1", data1.getProperties().get(DIAGNOSTIC_ID_KEY)); + producer.send(batch); } finally { producer.close(); } @@ -404,7 +467,11 @@ public void startsMessageSpanOnEventBatch() { verify(tracer1, times(2)) .start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); - verify(tracer1, times(2)).end(eq("success"), isNull(), any()); + verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any()); + verify(tracer1, times(2)).addLink(any()); + verify(tracer1, times(1)).start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND)); + verify(tracer1, times(2)).start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE)); + verify(tracer1, times(3)).end(eq("success"), isNull(), any()); verifyNoInteractions(onClientClosed); }