Skip to content

Commit f265950

Browse files
authored
DSM support for SQS (#4973)
1 parent f03ab8d commit f265950

22 files changed

+693
-178
lines changed

tracer/missing-nullability-files.csv

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -728,9 +728,7 @@ src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SDK/IInvocationResponse.cs
728728
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SDK/ILambdaContext.cs
729729
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SNS/CachedMessageHeadersHelper.cs
730730
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/CachedMessageHeadersHelper.cs
731-
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ContextPropagation.cs
732731
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/IAmazonSQSRequestWithQueueUrl.cs
733-
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/IContainsMessageAttributes.cs
734732
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ICreateQueueRequest.cs
735733
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ICreateQueueResponse.cs
736734
src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ISendMessageBatchRequest.cs
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// <copyright file="AwsSqsHeadersAdapters.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using System;
9+
using System.Collections;
10+
using System.Collections.Generic;
11+
using System.Text;
12+
using Datadog.Trace.DuckTyping;
13+
using Datadog.Trace.Headers;
14+
using Datadog.Trace.Vendors.Newtonsoft.Json;
15+
16+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS;
17+
18+
internal class AwsSqsHeadersAdapters
19+
{
20+
public static IBinaryHeadersCollection GetInjectionAdapter(StringBuilder carrier)
21+
{
22+
return new StringBuilderJsonAdapter(carrier);
23+
}
24+
25+
public static IBinaryHeadersCollection GetExtractionAdapter(IDictionary? messageAttributes)
26+
{
27+
return new MessageAttributesAdapter(messageAttributes);
28+
}
29+
30+
/// <summary>
31+
/// The adapter to use to append stuff to a string builder where a json is being built
32+
/// </summary>
33+
private readonly struct StringBuilderJsonAdapter : IBinaryHeadersCollection
34+
{
35+
private readonly StringBuilder _carrier;
36+
37+
public StringBuilderJsonAdapter(StringBuilder carrier)
38+
{
39+
_carrier = carrier;
40+
}
41+
42+
public byte[] TryGetLastBytes(string name)
43+
{
44+
throw new NotImplementedException("this adapter can only be use to write to a StringBuilder, not to read data");
45+
}
46+
47+
public void Add(string key, byte[] value)
48+
{
49+
_carrier
50+
.Append(value: '"')
51+
.Append(key)
52+
.Append("\":\"")
53+
.Append(Convert.ToBase64String(value))
54+
.Append("\",");
55+
}
56+
}
57+
58+
/// <summary>
59+
/// The adapter to use to read attributes packed in a json string under the _datadog key
60+
/// </summary>
61+
private readonly struct MessageAttributesAdapter : IBinaryHeadersCollection
62+
{
63+
private readonly Dictionary<string, string>? _ddAttributes;
64+
65+
public MessageAttributesAdapter(IDictionary? messageAttributes)
66+
{
67+
// IDictionary returns null if the key is not present
68+
var json = messageAttributes?[ContextPropagation.SqsKey]?.DuckCast<IMessageAttributeValue>();
69+
if (json != null && json.StringValue != null)
70+
{
71+
_ddAttributes = JsonConvert.DeserializeObject<Dictionary<string, string>>(json.StringValue);
72+
}
73+
}
74+
75+
public byte[] TryGetLastBytes(string name)
76+
{
77+
if (_ddAttributes != null && _ddAttributes.TryGetValue(name, out var b64))
78+
{
79+
return Convert.FromBase64String(b64);
80+
}
81+
82+
return Array.Empty<byte>();
83+
}
84+
85+
public void Add(string name, byte[] value)
86+
{
87+
throw new NotImplementedException("this is meant to read attributes only, not write them");
88+
}
89+
}
90+
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ContextPropagation.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,39 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
55

6+
#nullable enable
67
using System;
78
using System.Collections;
89
using System.Collections.Generic;
910
using System.Text;
1011
using Datadog.Trace.DataStreamsMonitoring;
12+
using Datadog.Trace.DuckTyping;
13+
using Datadog.Trace.ExtensionMethods;
14+
using Datadog.Trace.Headers;
1115
using Datadog.Trace.Propagators;
16+
using Datadog.Trace.Vendors.Newtonsoft.Json;
1217

1318
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
1419
{
1520
internal static class ContextPropagation
1621
{
17-
private const string SqsKey = "_datadog";
22+
internal const string SqsKey = "_datadog";
1823

19-
private static void Inject<TMessageRequest>(SpanContext context, IDictionary messageAttributes)
24+
private static void Inject<TMessageRequest>(SpanContext context, IDictionary messageAttributes, DataStreamsManager? dataStreamsManager)
2025
{
2126
// Consolidate headers into one JSON object with <header_name>:<value>
2227
var sb = Util.StringBuilderCache.Acquire(Util.StringBuilderCache.MaxBuilderSize);
2328
sb.Append('{');
2429
SpanContextPropagator.Instance.Inject(context, sb, default(StringBuilderCarrierSetter));
30+
dataStreamsManager?.InjectPathwayContext(context.PathwayContext, AwsSqsHeadersAdapters.GetInjectionAdapter(sb));
2531
sb.Remove(startIndex: sb.Length - 1, length: 1); // Remove trailing comma
2632
sb.Append('}');
2733

2834
var resultString = Util.StringBuilderCache.GetStringAndRelease(sb);
2935
messageAttributes[SqsKey] = CachedMessageHeadersHelper<TMessageRequest>.CreateMessageAttributeValue(resultString);
3036
}
3137

32-
public static void InjectHeadersIntoMessage<TMessageRequest>(IContainsMessageAttributes carrier, SpanContext spanContext)
38+
public static void InjectHeadersIntoMessage<TMessageRequest>(IContainsMessageAttributes carrier, SpanContext spanContext, DataStreamsManager? dataStreamsManager)
3339
{
3440
// add distributed tracing headers to the message
3541
if (carrier.MessageAttributes == null)
@@ -40,7 +46,7 @@ public static void InjectHeadersIntoMessage<TMessageRequest>(IContainsMessageAtt
4046
{
4147
// In .NET Fx and Net Core 2.1, removing an element while iterating on keys throws.
4248
#if !NETCOREAPP2_1_OR_GREATER
43-
List<string> attributesToRemove = null;
49+
List<string>? attributesToRemove = null;
4450
#endif
4551
// Make sure we do not propagate any other datadog header here in the rare cases where users would have added them manually
4652
foreach (var attribute in carrier.MessageAttributes.Keys)
@@ -73,7 +79,7 @@ public static void InjectHeadersIntoMessage<TMessageRequest>(IContainsMessageAtt
7379
// Only inject if there's room
7480
if (carrier.MessageAttributes.Count < 10)
7581
{
76-
Inject<TMessageRequest>(spanContext, carrier.MessageAttributes);
82+
Inject<TMessageRequest>(spanContext, carrier.MessageAttributes, dataStreamsManager);
7783
}
7884
}
7985

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/IContainsMessageAttributes.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
55

6+
#nullable enable
67
using System.Collections;
8+
using System.IO;
79

810
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
911
{
@@ -15,6 +17,15 @@ internal interface IContainsMessageAttributes
1517
/// <summary>
1618
/// Gets or sets the message attributes
1719
/// </summary>
18-
IDictionary MessageAttributes { get; set; }
20+
IDictionary? MessageAttributes { get; set; } // <string, IMessageAttributeValue>
21+
}
22+
23+
internal interface IMessageAttributeValue
24+
{
25+
string? DataType { get; set; } // can be String, Number, or Binary
26+
27+
string? StringValue { get; set; } // filled if DataType is String or Number
28+
29+
MemoryStream? BinaryValue { get; set; } // filled if DataType is Binary
1930
}
2031
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// <copyright file="IReceiveMessageRequest.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using System.Collections.Generic;
9+
using Datadog.Trace.DuckTyping;
10+
11+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS;
12+
13+
internal interface IReceiveMessageRequest : IAmazonSQSRequestWithQueueUrl, IDuckType
14+
{
15+
List<string?> MessageAttributeNames { get; }
16+
17+
List<string?> AttributeNames { get; }
18+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// <copyright file="IReceiveMessageResponse.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using System.Collections;
9+
using System.Collections.Generic;
10+
using Datadog.Trace.DuckTyping;
11+
12+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
13+
{
14+
internal interface IReceiveMessageResponse : IDuckType
15+
{
16+
IList? Messages { get; } // <IMessage>
17+
}
18+
19+
internal interface IMessage : IContainsMessageAttributes
20+
{
21+
Dictionary<string, string?>? Attributes { get; set; }
22+
}
23+
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ReceiveMessageAsyncIntegration.cs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
using System.ComponentModel;
1010
using System.Threading;
1111
using Datadog.Trace.ClrProfiler.CallTarget;
12+
using Datadog.Trace.DataStreamsMonitoring;
1213
using Datadog.Trace.DuckTyping;
13-
using Datadog.Trace.Tagging;
14+
using Datadog.Trace.Vendors.Newtonsoft.Json.Utilities;
1415

1516
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
1617
{
@@ -42,21 +43,26 @@ public class ReceiveMessageAsyncIntegration
4243
/// <param name="cancellationToken">CancellationToken value</param>
4344
/// <returns>Calltarget state value</returns>
4445
internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(TTarget instance, TReceiveMessageRequest request, CancellationToken cancellationToken)
45-
where TReceiveMessageRequest : IAmazonSQSRequestWithQueueUrl, IDuckType
46+
where TReceiveMessageRequest : IReceiveMessageRequest
4647
{
4748
if (request.Instance is null)
4849
{
4950
return CallTargetState.GetDefault();
5051
}
5152

53+
var queueName = AwsSqsCommon.GetQueueName(request.QueueUrl);
5254
var scope = AwsSqsCommon.CreateScope(Tracer.Instance, Operation, out var tags, spanKind: SpanKinds.Consumer);
5355
if (tags is not null && request.QueueUrl is not null)
5456
{
5557
tags.QueueUrl = request.QueueUrl;
56-
tags.QueueName = AwsSqsCommon.GetQueueName(request.QueueUrl);
58+
tags.QueueName = queueName;
5759
}
5860

59-
return new CallTargetState(scope);
61+
// request the message attributes that a datadog instrumentation might have set when sending
62+
request.MessageAttributeNames.AddDistinct(ContextPropagation.SqsKey);
63+
request.AttributeNames.AddDistinct("SentTimestamp");
64+
65+
return new CallTargetState(scope, queueName);
6066
}
6167

6268
/// <summary>
@@ -70,7 +76,35 @@ internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(T
7076
/// <param name="state">Calltarget state value</param>
7177
/// <returns>A response value, in an async scenario will be T of Task of T</returns>
7278
internal static TResponse OnAsyncMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception exception, in CallTargetState state)
79+
where TResponse : IReceiveMessageResponse
7380
{
81+
if (response.Instance != null && response.Messages != null && response.Messages.Count > 0 && state.State != null)
82+
{
83+
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
84+
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
85+
{
86+
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:sqs" };
87+
foreach (var o in response.Messages)
88+
{
89+
var message = o.DuckCast<IMessage>();
90+
if (message == null)
91+
{
92+
continue; // should not happen
93+
}
94+
95+
var sentTime = 0;
96+
if (message.Attributes != null && message.Attributes.TryGetValue("SentTimestamp", out var sentTimeStr) && sentTimeStr != null)
97+
{
98+
int.TryParse(sentTimeStr, out sentTime);
99+
}
100+
101+
var adapter = AwsSqsHeadersAdapters.GetExtractionAdapter(message.MessageAttributes);
102+
var parentPathway = dataStreamsManager.ExtractPathwayContext(adapter);
103+
state.Scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Consume, edgeTags, payloadSizeBytes: 0, sentTime, parentPathway);
104+
}
105+
}
106+
}
107+
74108
state.Scope.DisposeWithException(exception);
75109
return response;
76110
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ReceiveMessageIntegration.cs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
using System;
99
using System.ComponentModel;
1010
using Datadog.Trace.ClrProfiler.CallTarget;
11+
using Datadog.Trace.DataStreamsMonitoring;
1112
using Datadog.Trace.DuckTyping;
12-
using Datadog.Trace.Tagging;
13+
using Datadog.Trace.Vendors.Newtonsoft.Json.Utilities;
1314

1415
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.SQS
1516
{
@@ -40,21 +41,26 @@ public class ReceiveMessageIntegration
4041
/// <param name="request">The request for the SQS operation</param>
4142
/// <returns>Calltarget state value</returns>
4243
internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(TTarget instance, TReceiveMessageRequest request)
43-
where TReceiveMessageRequest : IAmazonSQSRequestWithQueueUrl, IDuckType
44+
where TReceiveMessageRequest : IReceiveMessageRequest
4445
{
4546
if (request.Instance is null)
4647
{
4748
return CallTargetState.GetDefault();
4849
}
4950

51+
var queueName = AwsSqsCommon.GetQueueName(request.QueueUrl);
5052
var scope = AwsSqsCommon.CreateScope(Tracer.Instance, Operation, out var tags, spanKind: SpanKinds.Consumer);
5153
if (tags is not null && request.QueueUrl is not null)
5254
{
5355
tags.QueueUrl = request.QueueUrl;
54-
tags.QueueName = AwsSqsCommon.GetQueueName(request.QueueUrl);
56+
tags.QueueName = queueName;
5557
}
5658

57-
return new CallTargetState(scope);
59+
// request the message attributes that a datadog instrumentation might have set when sending
60+
request.MessageAttributeNames.AddDistinct(ContextPropagation.SqsKey);
61+
request.AttributeNames.AddDistinct("SentTimestamp");
62+
63+
return new CallTargetState(scope, queueName);
5864
}
5965

6066
/// <summary>
@@ -68,7 +74,35 @@ internal static CallTargetState OnMethodBegin<TTarget, TReceiveMessageRequest>(T
6874
/// <param name="state">Calltarget state value</param>
6975
/// <returns>A response value, in an async scenario will be T of Task of T</returns>
7076
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception exception, in CallTargetState state)
77+
where TResponse : IReceiveMessageResponse
7178
{
79+
if (response.Instance != null && response.Messages != null && response.Messages.Count > 0 && state.State != null)
80+
{
81+
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
82+
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
83+
{
84+
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:sqs" };
85+
foreach (var o in response.Messages)
86+
{
87+
var message = o.DuckCast<IMessage>();
88+
if (message == null)
89+
{
90+
continue; // should not happen
91+
}
92+
93+
var sentTime = 0;
94+
if (message.Attributes != null && message.Attributes.TryGetValue("SentTimestamp", out var sentTimeStr) && sentTimeStr != null)
95+
{
96+
int.TryParse(sentTimeStr, out sentTime);
97+
}
98+
99+
var adapter = AwsSqsHeadersAdapters.GetExtractionAdapter(message.MessageAttributes);
100+
var parentPathway = dataStreamsManager.ExtractPathwayContext(adapter);
101+
state.Scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Consume, edgeTags, payloadSizeBytes: 0, sentTime, parentPathway);
102+
}
103+
}
104+
}
105+
72106
state.Scope.DisposeWithException(exception);
73107
return new CallTargetReturn<TResponse>(response);
74108
}

0 commit comments

Comments
 (0)