Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tracing response #1866

Merged
merged 4 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Trace;
using Proto.Extensions;
using Proto.Mailbox;

namespace Proto.OpenTelemetry;
Expand Down Expand Up @@ -107,6 +108,10 @@ public override void Forward(PID target) =>
public override Task Receive(MessageEnvelope envelope) =>
OpenTelemetryMethodsDecorators.Receive(Source, envelope, _receiveActivitySetup,
() => base.Receive(envelope));

public override void Respond(object message)=>
OpenTelemetryMethodsDecorators.Respond(Source, base.Sender!, message, _receiveActivitySetup,
() => base.Respond(message));
}

internal static class OpenTelemetryMethodsDecorators
Expand Down Expand Up @@ -194,7 +199,9 @@ internal static async Task<T> RequestAsync<T>(string source, PID target, object
{
activity?.SetTag(ProtoTags.TargetPID, target.ToString());

return await requestAsync().ConfigureAwait(false);
var res= await requestAsync().ConfigureAwait(false);
activity?.SetTag(ProtoTags.ResponseMessageType, res.GetMessageTypeName());
return res;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -226,6 +233,29 @@ internal static void Forward(string source, PID target, object message, Activity
throw;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void Respond(string source, PID target, object message, ActivitySetup sendActivitySetup,
Action respond)
{
using var activity =
OpenTelemetryHelpers.BuildStartedActivity(Activity.Current?.Context ?? default, source, nameof(Forward),
message, sendActivitySetup);

try
{
activity?.SetTag(ProtoTags.TargetPID, target.ToString());
activity?.SetTag(ProtoTags.ResponseMessageType, message.GetMessageTypeName());
respond();
}
catch (Exception ex)
{
activity?.RecordException(ex);
activity?.SetStatus(Status.Error);

throw;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static async Task Receive(string source, MessageEnvelope envelope, ActivitySetup receiveActivitySetup,
Expand Down
5 changes: 5 additions & 0 deletions src/Proto.OpenTelemetry/ProtoTags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public static class ProtoTags
/// GetType().Name on the message
/// </summary>
public const string MessageType = "proto.messagetype";

/// <summary>
/// GetType().Name on the response message
/// </summary>
public const string ResponseMessageType = "proto.responsemessagetype";

/// <summary>
/// Message destination PID string representation
Expand Down
34 changes: 26 additions & 8 deletions tests/Proto.Cluster.Tests/ClusterFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Proto.OpenTelemetry;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using Proto.Utils;
using Xunit;

// ReSharper disable ClassNeverInstantiated.Global
Expand Down Expand Up @@ -115,8 +116,7 @@ public async Task DisposeAsync()
{
await WaitForMembersToShutdown();
}

_tracerProvider?.Dispose();

Members.Clear(); // prevent multiple shutdown attempts if dispose is called multiple times
}
catch (Exception e)
Expand All @@ -140,7 +140,13 @@ private async Task WaitForMembersToShutdown()
try
{
_logger.LogInformation("Shutting down cluster member {MemberId}", cluster.System.Id);
await task;

var done = await task.WaitUpTo(TimeSpan.FromSeconds(5));
if (! done)
{
_logger.LogWarning("Failed to shutdown cluster member {MemberId} gracefully", cluster.System.Id);
}

}
catch (Exception e)
{
Expand Down Expand Up @@ -234,11 +240,16 @@ private static void InitOpenTelemetryTracing()
private async Task<IList<Cluster>> SpawnClusterNodes(
int count,
Func<ClusterConfig, ClusterConfig>? configure = null
) =>
(await Task.WhenAll(
Enumerable.Range(0, count)
.Select(_ => SpawnClusterMember(configure))
)).ToList();
)
{
var tasks = Enumerable.Range(0, count)
.Select(_ => SpawnClusterMember(configure));

var res = (await Task.WhenAll(tasks)).ToList();
await res.First().MemberList.TopologyConsensus(CancellationTokens.FromSeconds(10));

return res;
}

protected virtual async Task<Cluster> SpawnClusterMember(Func<ClusterConfig, ClusterConfig>? configure)
{
Expand Down Expand Up @@ -281,6 +292,13 @@ protected virtual ActorSystemConfig GetActorSystemConfig()
return EnableTracing
? actorSystemConfig
.WithConfigureProps(props => props.WithTracing())
.WithConfigureSystemProps((name,props) =>
{
if (name == "$gossip")
return props;

return props.WithTracing();
})
.WithConfigureRootContext(context => context.WithTracing())
: actorSystemConfig;
}
Expand Down
8 changes: 4 additions & 4 deletions tests/Proto.Cluster.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,10 @@ await Tracing.Trace(async () =>

var timer = Stopwatch.StartNew();

await Task.WhenAll(Members.SelectMany(member =>
GetActorIds(actorCount).Select(id => PingPong(member, id, timeout, kind))
)
);
var tasks = Members.SelectMany(member =>
GetActorIds(actorCount).Select(id => PingPong(member, id, timeout, kind))).ToList();

await Task.WhenAll(tasks);

timer.Stop();
_testOutputHelper.WriteLine($"Spawned {actorCount} actors across {Members.Count} nodes in {timer.Elapsed}");
Expand Down
6 changes: 5 additions & 1 deletion tests/Proto.Cluster.Tests/Tracing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ public static async Task Trace(Func<Task> callBack, ITestOutputHelper testOutput
if (activity is not null)
{
activity.AddTag("test.name", callerName);
testOutputHelper.WriteLine("http://localhost:5001/logs?traceId={0}", activity.TraceId);
testOutputHelper.WriteLine("http://localhost:5001/logs?traceId={0}", activity.TraceId.ToString().ToUpperInvariant());
}
else
{
testOutputHelper.WriteLine("No active trace span");
}

try
Expand Down
7 changes: 5 additions & 2 deletions tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ private void TracesPropagateCorrectly(ActivitySpanId outerSpanId, ActivityTraceI
outerSpan.Should().NotBeNull();
outerSpan!.SpanId.Should().Be(outerSpanId);
outerSpan.OperationName.Should().Be(nameof(Trace));
var inner = activities.Last();
inner.Tags.Should().Contain(new KeyValuePair<string, string?>("inner", "true"));
//get second last activity

var inner = activities.LastOrDefault(s => s.Tags.Contains(new KeyValuePair<string, string?>("inner", "true")));

inner.Should().NotBeNull();
}

private async Task VerifyTrace(Func<IRootContext, PID, Task> action)
Expand Down