diff --git a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs index 1162f31..211666d 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs +++ b/extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Globalization; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host.Executors; @@ -20,6 +21,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ { internal sealed class RabbitMQListener : IListener, IScaleMonitor { +#pragma warning disable SA1000 + private static readonly ActivitySource ActivitySource = new("Microsoft.Azure.WebJobs.Extensions.RabbitMQ"); +#pragma warning restore SA1000 private readonly ITriggeredFunctionExecutor executor; private readonly string queueName; private readonly ushort prefetchCount; @@ -82,6 +86,8 @@ public Task StartAsync(CancellationToken cancellationToken) this.consumer.Received += async (model, ea) => { + using Activity activity = StartActivity(ea); + var input = new TriggeredFunctionData() { TriggerValue = ea }; FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false); @@ -152,6 +158,26 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext con return this.GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray()); } + internal static Activity StartActivity(BasicDeliverEventArgs ea) + { + Activity activity; + if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("traceparent")) + { + byte[] traceParentIdInBytes = ea.BasicProperties.Headers["traceparent"] as byte[]; + string traceparentId = Encoding.UTF8.GetString(traceParentIdInBytes); + activity = ActivitySource.StartActivity("Trigger", ActivityKind.Consumer, traceparentId); + } + else + { + activity = ActivitySource.StartActivity("Trigger", ActivityKind.Consumer); + ea.BasicProperties.Headers ??= new Dictionary(); + byte[] traceParentIdInBytes = Encoding.UTF8.GetBytes(activity.Id); + ea.BasicProperties.Headers["traceparent"] = traceParentIdInBytes; + } + + return activity; + } + internal void CreateHeadersAndRepublish(BasicDeliverEventArgs ea) { if (ea.BasicProperties.Headers == null) diff --git a/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj b/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj index 1cd4b74..74fed4c 100644 --- a/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj +++ b/extension/WebJobs.Extensions.RabbitMQ/WebJobs.Extensions.RabbitMQ.csproj @@ -58,6 +58,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all +