Skip to content

Commit

Permalink
Updates from working version.
Browse files Browse the repository at this point in the history
Add Cron and Servicebus Trigger Handlers, Fix some issues with the Http Trigger Handler, and downgrade the azure function packages as there's an issue with current versions around the Opentelemetry context (see Azure/azure-functions-dotnet-worker#2543)
  • Loading branch information
frozenskys committed Jul 25, 2024
1 parent 075fa25 commit 5e213fa
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@ public class ActivityTrackingMiddleware : IFunctionsWorkerMiddleware
private readonly HttpTriggerHandler HttpTriggerHandler;
private readonly ActivityTriggerHandler ActivityTriggerHandler;
private readonly OrchestrationTriggerHandler OrchestratorTriggerHandler;
private readonly ServiceBusTriggerHandler ServiceBusTriggerHandler;
private readonly CronTriggerHandler CronTriggerHandler;
private readonly FrozenDictionary<string, ICustomTriggerHandler> CustomTriggerHandlers;

public ActivityTrackingMiddleware(
HttpTriggerHandler httpTriggerHandler,
ActivityTriggerHandler activityTriggerHandler,
IEnumerable<ICustomTriggerHandler> customTriggerHandlers,
OrchestrationTriggerHandler orchestratorTriggerHandler)
OrchestrationTriggerHandler orchestratorTriggerHandler,
ServiceBusTriggerHandler serviceBusTriggerHandler,
CronTriggerHandler cronTriggerHandler)
{
HttpTriggerHandler = httpTriggerHandler ?? throw new ArgumentNullException(nameof(httpTriggerHandler));
CustomTriggerHandlers = customTriggerHandlers.ToFrozenDictionary(x => x.GetTriggerTypeName());
ActivityTriggerHandler = activityTriggerHandler ?? throw new ArgumentNullException(nameof(activityTriggerHandler));
OrchestratorTriggerHandler = orchestratorTriggerHandler ?? throw new ArgumentNullException(nameof(orchestratorTriggerHandler));
ServiceBusTriggerHandler = serviceBusTriggerHandler ?? throw new ArgumentNullException(nameof(serviceBusTriggerHandler));
CronTriggerHandler = cronTriggerHandler ?? throw new ArgumentNullException(nameof(cronTriggerHandler));
}

public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
Expand All @@ -43,6 +49,8 @@ public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next
"httpTrigger" => HttpTriggerHandler,
"activityTrigger" => ActivityTriggerHandler,
"orchestrationTrigger" => OrchestratorTriggerHandler,
"serviceBusTrigger" => ServiceBusTriggerHandler,
"timerTrigger" => CronTriggerHandler,
_ => CustomTriggerHandlers.TryGetValue(triggerType, out var h) ? h : null,
};

Expand All @@ -55,19 +63,20 @@ handler is null
context,
next);

if (activity is not null)
{
activity.SetTag(TraceSemanticConventions.AttributeFaasInvokedName, context.FunctionDefinition.Name);
activity.SetTag(TraceSemanticConventions.AttributeFaasExecution, context.InvocationId);
activity.SetTag(FunctionActivityConstants.Entrypoint, context.FunctionDefinition.EntryPoint);
activity.SetTag(FunctionActivityConstants.Id, context.FunctionDefinition.Id);
}
}
finally
{
activity?.Dispose();
}
}
if (activity is not null)
{
activity.SetTag(TraceSemanticConventions.AttributeFaasInvokedName, context.FunctionDefinition.Name);
activity.SetTag(TraceSemanticConventions.AttributeFaasExecution, context.InvocationId);
activity.SetTag(TraceSemanticConventions.AttributeFaasTrigger, triggerType);
activity.SetTag(FunctionActivityConstants.Entrypoint, context.FunctionDefinition.EntryPoint);
activity.SetTag(FunctionActivityConstants.Id, context.FunctionDefinition.Id);
}
}
finally
{
activity?.Dispose();
}
}

private async Task<Activity?> PassthroughHandlerAsync(FunctionContext context, FunctionExecutionDelegate next)
{
Expand Down
21 changes: 21 additions & 0 deletions Source/Morris.Azure.Functions.OpenTelemetry/CronTriggerHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Middleware;
using OpenTelemetry.Trace;
using System.Diagnostics;

namespace Morris.Azure.Functions.OpenTelemetry;

public class CronTriggerHandler : ITriggerHandler
{
public async Task<Activity?> HandleAsync(ActivitySource activitySource, TriggerParameterInfo triggerParameterInfo, FunctionContext context, FunctionExecutionDelegate next)
{
string activityName = context.FunctionDefinition.Name;
Activity? activity = activitySource.StartActivity(name: activityName, kind: ActivityKind.Server);

var schedule = ((TimerTriggerAttribute)triggerParameterInfo.BindingAttribute).Schedule;
activity?.SetTag(TraceSemanticConventions.AttributeFaasCron, schedule);

await next(context);
return activity;
}
}
63 changes: 36 additions & 27 deletions Source/Morris.Azure.Functions.OpenTelemetry/HttpTriggerHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Azure.Functions.Worker.Middleware;
using Microsoft.Azure.Functions.Worker;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
using OpenTelemetry;
Expand Down Expand Up @@ -31,34 +31,43 @@ public class HttpTriggerHandler : ITriggerHandler
carrier: requestData.Headers,
getter: ExtractContextFromHeaderCollection);

string activityName = $"{requestData.Method} {context.FunctionDefinition.Name}";
Activity? activity = activitySource
.StartActivity(
name: activityName,
kind: ActivityKind.Server,
string activityName = $"{requestData.Method} {context.FunctionDefinition.Name}";
Activity? activity = null;

if (newPropagationContext.ActivityContext.TraceFlags == ActivityTraceFlags.Recorded)
{
activity = activitySource.StartActivity(
name: activityName,
kind: ActivityKind.Server,
newPropagationContext.ActivityContext);
}
else
{
activity = activitySource.StartActivity(
name: activityName,
kind: ActivityKind.Server);

}

string? route = requestData.Url.AbsolutePath;
activity?.SetTag(TraceSemanticConventions.AttributeHttpRoute, route);
activity?.SetTag(TraceSemanticConventions.AttributeHttpMethod, requestData.Method);
activity?.SetTag(TraceSemanticConventions.AttributeHttpTarget, requestData.Url);
activity?.SetTag(TraceSemanticConventions.AttributeNetHostName, requestData.Url.Host);
activity?.SetTag(TraceSemanticConventions.AttributeNetHostPort, requestData.Url.Port);
activity?.SetTag(TraceSemanticConventions.AttributeHttpScheme, requestData.Url.Scheme);
activity?.SetTag(TraceSemanticConventions.AttributeHttpRequestContentLength, requestData.Body.Length);
string? route = requestData.Url.AbsolutePath;
activity?.SetTag(TraceSemanticConventions.AttributeHttpRoute, route);
activity?.SetTag(TraceSemanticConventions.AttributeHttpMethod, requestData.Method);
activity?.SetTag(TraceSemanticConventions.AttributeHttpTarget, requestData.Url);
activity?.SetTag(TraceSemanticConventions.AttributeNetHostName, requestData.Url.Host);
activity?.SetTag(TraceSemanticConventions.AttributeNetHostPort, requestData.Url.Port);
activity?.SetTag(TraceSemanticConventions.AttributeHttpScheme, requestData.Url.Scheme);

try
{
await next(context);
}
finally
{
if (context.GetHttpResponseData() is { } responseData)
{
activity?.SetTag(TraceSemanticConventions.AttributeHttpStatusCode, responseData.StatusCode);
activity?.SetTag(TraceSemanticConventions.AttributeHttpResponseContentLength, responseData.Body.Length);
}
}
try
{
await next(context);
}
finally
{
if (context.GetHttpResponseData() is { } responseData)
{
activity?.SetTag(TraceSemanticConventions.AttributeHttpStatusCode, (int)responseData.StatusCode);
}
}

return activity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.22.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.2.0" />
<PackageReference Include="OpenTelemetry" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.21.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.1.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.ServiceBus" Version="5.17.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Timer" Version="4.2.0" />
<PackageReference Include="OpenTelemetry" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.SemanticConventions" Version="1.0.0-rc9.9" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ private static void RegisterHandlersForWellKnownTriggers(IFunctionsWorkerApplica
builder.Services.TryAddSingleton<HttpTriggerHandler>();
builder.Services.TryAddSingleton<ActivityTriggerHandler>();
builder.Services.TryAddSingleton<OrchestrationTriggerHandler>();
builder.Services.TryAddSingleton<ServiceBusTriggerHandler>();
builder.Services.TryAddSingleton<CronTriggerHandler>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Middleware;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
using System.Diagnostics;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Morris.Azure.Functions.OpenTelemetry;

public class ServiceBusTriggerHandler : ITriggerHandler
{
public async Task<Activity?> HandleAsync(ActivitySource activitySource, TriggerParameterInfo triggerParameterInfo, FunctionContext context, FunctionExecutionDelegate next)
{
string? applicationPropertiesJson = context.BindingContext.BindingData["ApplicationProperties"]?.ToString();
ApplicationProperties? applicationProperties = null;

if (applicationPropertiesJson != null)
{
applicationProperties = JsonSerializer.Deserialize<ApplicationProperties>(applicationPropertiesJson);
}

ActivityContext currentActivityContext = Activity.Current?.Context ?? new ActivityContext();
var propagationContext = new PropagationContext(currentActivityContext, Baggage.Current);

PropagationContext newPropagationContext = Propagators
.DefaultTextMapPropagator
.Extract(context: propagationContext, carrier: applicationProperties, getter: ExtractContextFromApplicationProperties);

string activityName = context.FunctionDefinition.Name;
Activity? activity = null;

if (newPropagationContext.ActivityContext.TraceFlags == ActivityTraceFlags.Recorded)
{
activity = activitySource.StartActivity(name: activityName, kind: ActivityKind.Server, newPropagationContext.ActivityContext);
}
else
{
activity = activitySource.StartActivity(name: activityName, kind: ActivityKind.Server);
}

activity?.SetTag("messaging.system", "servicebus");
activity?.SetTag("messaging.operation.type", "receive");
activity?.SetTag("messaging.servicebus.message.enqueued_time", context.BindingContext.BindingData["EnqueuedTimeUtc"]?.ToString());
activity?.SetTag("messaging.message.id", context.BindingContext.BindingData["MessageId"]?.ToString());
activity?.SetTag("messaging.message.sequence_number", context.BindingContext.BindingData["SequenceNumber"]?.ToString());
activity?.SetTag("messaging.servicebus.message.delivery_count", context.BindingContext.BindingData["DeliveryCount"]?.ToString());

await next(context);
return activity;
}

internal class ApplicationProperties()
{
[JsonPropertyName("Diagnostic-Id")]
public string? DiagnosticId { get; set; }
}

internal static IEnumerable<string> ExtractContextFromApplicationProperties(ApplicationProperties? applicationProperties, string key) =>
key == "traceparent" && applicationProperties?.DiagnosticId != null ? new List<string>() { applicationProperties.DiagnosticId } : [];
}

0 comments on commit 5e213fa

Please sign in to comment.