Skip to content

Commit 98b6687

Browse files
vandonrandrewlock
andauthored
refactor SQS send/receive instrumentation code (#5120)
* DSM checkpoint for SQS + pathway context propagation * fix possible nullrefs * missing nullability file * fix tostring on mockspan without delimiter, it means all spans are mixed when printed in a list * fix a couple issues with the ducktyping * missing nullability * read timestamp from message + DSM ITest * fix missing null file * fix indentation * log info about error before exiting in SQS sample program * use a collection to prevent parallel execution * improve diff printing in CI * probably need different snapshots depending on fwk * remove silent catch clause ? * wait for datastream points * make test more stable * add sent time in sync receive * cleanup * putting some changes aside for an other PR * address easy comments * comment to explain a bit what happens in the sample app * unify datastreams stats normalization between kafka and sqs * make sure we don't add requested attributes twice to receive requests * fix compilaiton for netframework * actually, keep the same logic as before when normalizing datastreams * pull object creation out of the loop * snapshot update after fix * bugfix * bugfix #2 where I was querying the wrong type * nit: WaitForDataStreamsPoints should stop if it has more points than expected * factorize SQS send/receive instrumentation code * replace extra calls to GetQueueName * support null DSM in Inject * fix static values that were recreated every time * expand string formating in stringbuilder Co-authored-by: Andrew Lock <andrew.lock@datadoghq.com> * nullable enable some files * put adapters in their own file + named methods to acess them * add a bunch of nullchecks * parse headers json only once in ctor --------- Co-authored-by: Andrew Lock <andrew.lock@datadoghq.com>
1 parent f265950 commit 98b6687

File tree

8 files changed

+200
-266
lines changed

8 files changed

+200
-266
lines changed
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// <copyright file="AwsSqsHandlerCommon.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using System;
9+
using Datadog.Trace.ClrProfiler.CallTarget;
10+
using Datadog.Trace.DataStreamsMonitoring;
11+
using Datadog.Trace.DuckTyping;
12+
using Datadog.Trace.Vendors.Newtonsoft.Json.Utilities;
13+
14+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS;
15+
16+
/// <summary>
17+
/// Contains the code that is shared between the integrations of sync/async and batch/single send and receive.
18+
/// </summary>
19+
internal static class AwsSqsHandlerCommon
20+
{
21+
internal static CallTargetState BeforeSend<TSendMessageRequest>(TSendMessageRequest request, SendType sendType)
22+
{
23+
if (request is null)
24+
{
25+
return CallTargetState.GetDefault();
26+
}
27+
28+
// we can't use generic constraints for this duck typing, because we need the original type
29+
// for the InjectHeadersIntoMessage<TSendMessageRequest> call below
30+
var requestProxy = request.DuckCast<IAmazonSQSRequestWithQueueUrl>();
31+
32+
var scope = AwsSqsCommon.CreateScope(Tracer.Instance, sendType.OperationName, out var tags, spanKind: SpanKinds.Producer);
33+
34+
var queueName = AwsSqsCommon.GetQueueName(requestProxy.QueueUrl);
35+
if (tags is not null && requestProxy.QueueUrl is not null)
36+
{
37+
tags.QueueUrl = requestProxy.QueueUrl;
38+
tags.QueueName = queueName;
39+
}
40+
41+
if (scope?.Span.Context != null && !string.IsNullOrEmpty(queueName))
42+
{
43+
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
44+
45+
if (sendType == SendType.SingleMessage)
46+
{
47+
InjectForSingleMessage(dataStreamsManager, request, scope, queueName);
48+
}
49+
else if (sendType == SendType.Batch)
50+
{
51+
InjectForBatch(dataStreamsManager, request, scope, queueName);
52+
}
53+
}
54+
55+
return new CallTargetState(scope);
56+
}
57+
58+
private static void InjectForSingleMessage<TSendMessageRequest>(DataStreamsManager? dataStreamsManager, TSendMessageRequest request, Scope scope, string queueName)
59+
{
60+
var requestProxy = request.DuckCast<ISendMessageRequest>();
61+
if (requestProxy == null)
62+
{
63+
return;
64+
}
65+
66+
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
67+
{
68+
var edgeTags = new[] { "direction:out", $"topic:{queueName}", "type:sqs" };
69+
scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0);
70+
}
71+
72+
ContextPropagation.InjectHeadersIntoMessage<TSendMessageRequest>(requestProxy, scope.Span.Context, dataStreamsManager);
73+
}
74+
75+
private static void InjectForBatch<TSendMessageBatchRequest>(DataStreamsManager? dataStreamsManager, TSendMessageBatchRequest request, Scope scope, string queueName)
76+
{
77+
var requestProxy = request.DuckCast<ISendMessageBatchRequest>();
78+
if (requestProxy == null)
79+
{
80+
return;
81+
}
82+
83+
var edgeTags = new[] { "direction:out", $"topic:{queueName}", "type:sqs" };
84+
foreach (var e in requestProxy.Entries)
85+
{
86+
var entry = e.DuckCast<IContainsMessageAttributes>();
87+
if (entry != null)
88+
{
89+
// this has no effect is DSM is disabled
90+
scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0);
91+
// this needs to be done for context propagation even when DSM is disabled
92+
// (when DSM is enabled, it injects the pathway context on top of the trace context)
93+
ContextPropagation.InjectHeadersIntoMessage<TSendMessageBatchRequest>(entry, scope.Span.Context, dataStreamsManager);
94+
}
95+
}
96+
}
97+
98+
internal static TResponse AfterSend<TResponse>(TResponse response, Exception? exception, in CallTargetState state)
99+
{
100+
state.Scope.DisposeWithException(exception);
101+
return response;
102+
}
103+
104+
internal static CallTargetState BeforeReceive(IReceiveMessageRequest request)
105+
{
106+
if (request.Instance is null)
107+
{
108+
return CallTargetState.GetDefault();
109+
}
110+
111+
var queueName = AwsSqsCommon.GetQueueName(request.QueueUrl);
112+
var scope = AwsSqsCommon.CreateScope(Tracer.Instance, "ReceiveMessage", out var tags, spanKind: SpanKinds.Consumer);
113+
if (tags is not null && request.QueueUrl is not null)
114+
{
115+
tags.QueueUrl = request.QueueUrl;
116+
tags.QueueName = queueName;
117+
}
118+
119+
// request the message attributes that a datadog instrumentation might have set when sending
120+
request.MessageAttributeNames.AddDistinct(ContextPropagation.SqsKey);
121+
request.AttributeNames.AddDistinct("SentTimestamp");
122+
123+
return new CallTargetState(scope, queueName);
124+
}
125+
126+
internal static TResponse AfterReceive<TResponse>(TResponse response, Exception? exception, in CallTargetState state)
127+
where TResponse : IReceiveMessageResponse
128+
{
129+
if (response.Instance != null && response.Messages != null && response.Messages.Count > 0 && state.State != null)
130+
{
131+
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
132+
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
133+
{
134+
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:sqs" };
135+
foreach (var o in response.Messages)
136+
{
137+
var message = o.DuckCast<IMessage>();
138+
if (message == null)
139+
{
140+
continue; // should not happen
141+
}
142+
143+
var sentTime = 0;
144+
if (message.Attributes != null && message.Attributes.TryGetValue("SentTimestamp", out var sentTimeStr) && sentTimeStr != null)
145+
{
146+
int.TryParse(sentTimeStr, out sentTime);
147+
}
148+
149+
var adapter = AwsSqsHeadersAdapters.GetExtractionAdapter(message.MessageAttributes);
150+
var parentPathway = dataStreamsManager.ExtractPathwayContext(adapter);
151+
state.Scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Consume, edgeTags, payloadSizeBytes: 0, sentTime, parentPathway);
152+
}
153+
}
154+
}
155+
156+
state.Scope.DisposeWithException(exception);
157+
return response;
158+
}
159+
160+
public class SendType
161+
{
162+
public static readonly SendType SingleMessage = new("SendMessage");
163+
164+
public static readonly SendType Batch = new("SendMessageBatch");
165+
166+
private SendType(string value)
167+
{
168+
OperationName = value;
169+
}
170+
171+
public string OperationName { get; }
172+
173+
public override string ToString()
174+
{
175+
return OperationName;
176+
}
177+
}
178+
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ReceiveMessageAsyncIntegration.cs

Lines changed: 3 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@
99
using System.ComponentModel;
1010
using System.Threading;
1111
using Datadog.Trace.ClrProfiler.CallTarget;
12-
using Datadog.Trace.DataStreamsMonitoring;
13-
using Datadog.Trace.DuckTyping;
14-
using Datadog.Trace.Vendors.Newtonsoft.Json.Utilities;
1512

1613
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
1714
{
@@ -31,8 +28,6 @@ namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
3128
[EditorBrowsable(EditorBrowsableState.Never)]
3229
public class ReceiveMessageAsyncIntegration
3330
{
34-
private const string Operation = "ReceiveMessage";
35-
3631
/// <summary>
3732
/// OnMethodBegin callback
3833
/// </summary>
@@ -45,24 +40,7 @@ public class ReceiveMessageAsyncIntegration
4540
internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(TTarget instance, TReceiveMessageRequest request, CancellationToken cancellationToken)
4641
where TReceiveMessageRequest : IReceiveMessageRequest
4742
{
48-
if (request.Instance is null)
49-
{
50-
return CallTargetState.GetDefault();
51-
}
52-
53-
var queueName = AwsSqsCommon.GetQueueName(request.QueueUrl);
54-
var scope = AwsSqsCommon.CreateScope(Tracer.Instance, Operation, out var tags, spanKind: SpanKinds.Consumer);
55-
if (tags is not null && request.QueueUrl is not null)
56-
{
57-
tags.QueueUrl = request.QueueUrl;
58-
tags.QueueName = queueName;
59-
}
60-
61-
// request the message attributes that a datadog instrumentation might have set when sending
62-
request.MessageAttributeNames.AddDistinct(ContextPropagation.SqsKey);
63-
request.AttributeNames.AddDistinct("SentTimestamp");
64-
65-
return new CallTargetState(scope, queueName);
43+
return AwsSqsHandlerCommon.BeforeReceive(request);
6644
}
6745

6846
/// <summary>
@@ -75,38 +53,10 @@ internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(T
7553
/// <param name="exception">Exception instance in case the original code threw an exception.</param>
7654
/// <param name="state">Calltarget state value</param>
7755
/// <returns>A response value, in an async scenario will be T of Task of T</returns>
78-
internal static TResponse OnAsyncMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception exception, in CallTargetState state)
56+
internal static TResponse OnAsyncMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
7957
where TResponse : IReceiveMessageResponse
8058
{
81-
if (response.Instance != null && response.Messages != null && response.Messages.Count > 0 && state.State != null)
82-
{
83-
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
84-
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
85-
{
86-
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:sqs" };
87-
foreach (var o in response.Messages)
88-
{
89-
var message = o.DuckCast<IMessage>();
90-
if (message == null)
91-
{
92-
continue; // should not happen
93-
}
94-
95-
var sentTime = 0;
96-
if (message.Attributes != null && message.Attributes.TryGetValue("SentTimestamp", out var sentTimeStr) && sentTimeStr != null)
97-
{
98-
int.TryParse(sentTimeStr, out sentTime);
99-
}
100-
101-
var adapter = AwsSqsHeadersAdapters.GetExtractionAdapter(message.MessageAttributes);
102-
var parentPathway = dataStreamsManager.ExtractPathwayContext(adapter);
103-
state.Scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Consume, edgeTags, payloadSizeBytes: 0, sentTime, parentPathway);
104-
}
105-
}
106-
}
107-
108-
state.Scope.DisposeWithException(exception);
109-
return response;
59+
return AwsSqsHandlerCommon.AfterReceive(response, exception, in state);
11060
}
11161
}
11262
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ReceiveMessageIntegration.cs

Lines changed: 3 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88
using System;
99
using System.ComponentModel;
1010
using Datadog.Trace.ClrProfiler.CallTarget;
11-
using Datadog.Trace.DataStreamsMonitoring;
12-
using Datadog.Trace.DuckTyping;
13-
using Datadog.Trace.Vendors.Newtonsoft.Json.Utilities;
1411

1512
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
1613
{
@@ -30,8 +27,6 @@ namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
3027
[EditorBrowsable(EditorBrowsableState.Never)]
3128
public class ReceiveMessageIntegration
3229
{
33-
private const string Operation = "ReceiveMessage";
34-
3530
/// <summary>
3631
/// OnMethodBegin callback
3732
/// </summary>
@@ -43,24 +38,7 @@ public class ReceiveMessageIntegration
4338
internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(TTarget instance, TReceiveMessageRequest request)
4439
where TReceiveMessageRequest : IReceiveMessageRequest
4540
{
46-
if (request.Instance is null)
47-
{
48-
return CallTargetState.GetDefault();
49-
}
50-
51-
var queueName = AwsSqsCommon.GetQueueName(request.QueueUrl);
52-
var scope = AwsSqsCommon.CreateScope(Tracer.Instance, Operation, out var tags, spanKind: SpanKinds.Consumer);
53-
if (tags is not null && request.QueueUrl is not null)
54-
{
55-
tags.QueueUrl = request.QueueUrl;
56-
tags.QueueName = queueName;
57-
}
58-
59-
// request the message attributes that a datadog instrumentation might have set when sending
60-
request.MessageAttributeNames.AddDistinct(ContextPropagation.SqsKey);
61-
request.AttributeNames.AddDistinct("SentTimestamp");
62-
63-
return new CallTargetState(scope, queueName);
41+
return AwsSqsHandlerCommon.BeforeReceive(request);
6442
}
6543

6644
/// <summary>
@@ -73,38 +51,10 @@ internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(T
7351
/// <param name="exception">Exception instance in case the original code threw an exception.</param>
7452
/// <param name="state">Calltarget state value</param>
7553
/// <returns>A response value, in an async scenario will be T of Task of T</returns>
76-
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception exception, in CallTargetState state)
54+
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
7755
where TResponse : IReceiveMessageResponse
7856
{
79-
if (response.Instance != null && response.Messages != null && response.Messages.Count > 0 && state.State != null)
80-
{
81-
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
82-
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
83-
{
84-
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:sqs" };
85-
foreach (var o in response.Messages)
86-
{
87-
var message = o.DuckCast<IMessage>();
88-
if (message == null)
89-
{
90-
continue; // should not happen
91-
}
92-
93-
var sentTime = 0;
94-
if (message.Attributes != null && message.Attributes.TryGetValue("SentTimestamp", out var sentTimeStr) && sentTimeStr != null)
95-
{
96-
int.TryParse(sentTimeStr, out sentTime);
97-
}
98-
99-
var adapter = AwsSqsHeadersAdapters.GetExtractionAdapter(message.MessageAttributes);
100-
var parentPathway = dataStreamsManager.ExtractPathwayContext(adapter);
101-
state.Scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Consume, edgeTags, payloadSizeBytes: 0, sentTime, parentPathway);
102-
}
103-
}
104-
}
105-
106-
state.Scope.DisposeWithException(exception);
107-
return new CallTargetReturn<TResponse>(response);
57+
return new CallTargetReturn<TResponse>(AwsSqsHandlerCommon.AfterReceive(response, exception, in state));
10858
}
10959
}
11060
}

0 commit comments

Comments
 (0)