Skip to content

Commit

Permalink
added Schema and TraceActivityConstants
Browse files Browse the repository at this point in the history
  • Loading branch information
bachuv committed Jan 30, 2025
1 parent 2afb232 commit 5db7ccf
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 13 deletions.
27 changes: 27 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Correlation/Schema.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
{
internal static class Schema
{
internal static class Task
{
internal const string Type = "durabletask.type";
internal const string Name = "durabletask.task.name";
internal const string Version = "durabletask.task.version";
internal const string InstanceId = "durabletask.task.instance_id";
internal const string ExecutionId = "durabletask.task.execution_id";
internal const string Status = "durabletask.task.status";
internal const string TaskId = "durabletask.task.task_id";
internal const string EventTargetInstanceId = "durabletask.event.target_instance_id";
internal const string FireAt = "durabletask.fire_at";
}

internal static class Status
{
internal const string Code = "otel.status_code";
internal const string Description = "otel.status_description";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,24 @@ public TelemetryActivator(IOptions<DurableTaskOptions> options, INameResolver na
/// <inheritdoc/>
public ValueTask DisposeAsync()
{
return this.telemetryModule?.DisposeAsync() ?? default;
if (this.telemetryModule != null)
{
this.telemetryModule.DisposeAsync();
}

if (this.webJobsTelemetryModule != null)
{
this.webJobsTelemetryModule.DisposeAsync();
}

return default;
}

/// <inheritdoc/>
public void Dispose()
{
this.telemetryModule?.DisposeAsync().AsTask().GetAwaiter().GetResult();
this.webJobsTelemetryModule?.DisposeAsync().AsTask().GetAwaiter().GetResult();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
{
internal class TraceActivityConstants
{
public const string Client = "client";
public const string Orchestration = "orchestration";
public const string Activity = "activity";
public const string Event = "event";
public const string Timer = "timer";

public const string CreateOrchestration = "create_orchestration";
public const string OrchestrationEvent = "orchestration_event";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private void OnEndActivity(Activity activity)
this.telemetryClient.Track(telemetry);
}

static OperationTelemetry CreateTelemetry(Activity activity)
private static OperationTelemetry CreateTelemetry(Activity activity)
{
OperationTelemetry telemetry;
ActivityStatusCode status = activity.Status;
Expand Down Expand Up @@ -76,7 +76,7 @@ static OperationTelemetry CreateTelemetry(Activity activity)
return telemetry;
}

static T CreateTelemetryCore<T>(Activity activity)
private static T CreateTelemetryCore<T>(Activity activity)
where T : OperationTelemetry, new()
{
T telemetry = new()
Expand Down Expand Up @@ -112,7 +112,7 @@ async ValueTask IAsyncDisposable.DisposeAsync()
this.listener?.Dispose();
if (this.telemetryClient != null)
{
using CancellationTokenSource cts = new(millisecondsDelay: 5000);
using CancellationTokenSource cts = new (millisecondsDelay: 5000);
try
{
await this.telemetryClient.FlushAsync(cts.Token);
Expand Down
25 changes: 16 additions & 9 deletions src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.ApplicationInsights.Extensibility.Implementation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using DTCore = DurableTask.Core;
Expand Down Expand Up @@ -178,13 +179,11 @@ public override Task<Empty> Hello(Empty request, ServerCallContext context)
string? traceParent = request.ParentTraceContext.TraceParent;
string? traceState = request.ParentTraceContext.TraceState;

// Create a new activity with the parent context
ActivityContext.TryParse(traceParent, traceState, out ActivityContext parentActivityContext);

// Create a new activity with the parent context above

Activity? scheduleOrchestrationActivity = StartActivityForNewOrchestration(executionStartedEvent, parentActivityContext);

// Use IOrchestrationService* APIs to start the orchestration
// Schedule the orchestration
try
{
await this.GetDurabilityProviderToScheduleOrchestration(context).CreateTaskOrchestrationAsync(
Expand Down Expand Up @@ -248,15 +247,14 @@ private OrchestrationStatus[] GetStatusesNotToOverride()

// Start the new activity to represent scheduling the orchestration
Activity? newActivity = activitySource.CreateActivity(
name: "create_orchestration from LocalGrpcListener", // CreateSpanName(TraceActivityConstants.CreateOrchestration, startEvent.Name, startEvent.Version),
name: CreateSpanName(TraceActivityConstants.CreateOrchestration, startEvent.Name, startEvent.Version),
kind: ActivityKind.Producer,
parentContext: parentTraceContext);

newActivity?.Start();

if (newActivity != null && !string.IsNullOrEmpty(newActivity.Id))
{
/*
newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.Orchestration);
newActivity.SetTag(Schema.Task.Name, startEvent.Name);
newActivity.SetTag(Schema.Task.InstanceId, startEvent.OrchestrationInstance.InstanceId);
Expand All @@ -266,9 +264,6 @@ private OrchestrationStatus[] GetStatusesNotToOverride()
{
newActivity.SetTag(Schema.Task.Version, startEvent.Version);
}
*/

// startEvent.SetParentTraceContext(newActivity);

// Set the parent trace context for the ExecutionStartedEvent
startEvent.ParentTraceContext = new DTCore.Tracing.DistributedTraceContext(newActivity?.Id!, newActivity?.TraceStateString);
Expand All @@ -277,6 +272,18 @@ private OrchestrationStatus[] GetStatusesNotToOverride()
return newActivity;
}

private static string CreateSpanName(string spanDescription, string? taskName, string? taskVersion)
{
if (!string.IsNullOrEmpty(taskVersion))
{
return $"{spanDescription}:{taskName}@({taskVersion})";
}
else
{
return $"{spanDescription}:{taskName}";
}
}

public async override Task<P.RaiseEventResponse> RaiseEvent(P.RaiseEventRequest request, ServerCallContext context)
{
await this.GetClient(context).RaiseEventAsync(request.InstanceId, request.Name, Raw(request.Input));
Expand Down

0 comments on commit 5db7ccf

Please sign in to comment.