Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serverless] Extract EventBridge from SNS/SQS Payloads and Create EventBridge Spans #29551

Merged
45 changes: 36 additions & 9 deletions pkg/serverless/invocationlifecycle/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func (lp *LifecycleProcessor) initFromSNSEvent(event events.SNSEvent) {
lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, sns)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractSNSEventArn(event))

// Check for EventBridge event wrapped by the SNS message
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Records[0].SNS.Message), &eventBridgeEvent); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, I wish we didn't have to unmarshal these same event messages twice, once here and once during trace context extraction. Something to think about for a future enhancement.

if len(eventBridgeEvent.Detail.TraceContext) > 0 {
lp.createWrappedEventBridgeSpan(eventBridgeEvent)
}
}
}

func (lp *LifecycleProcessor) initFromSQSEvent(event events.SQSEvent) {
Expand All @@ -154,19 +162,26 @@ func (lp *LifecycleProcessor) initFromSQSEvent(event events.SQSEvent) {
lp.addTag(tagFunctionTriggerEventSource, sqs)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractSQSEventARN(event))

// test for SNS
// Check for SNS event wrapped by the SQS body
var snsEntity events.SNSEntity
if err := json.Unmarshal([]byte(event.Records[0].Body), &snsEntity); err != nil {
return
if err := json.Unmarshal([]byte(event.Records[0].Body), &snsEntity); err == nil {
if strings.ToLower(snsEntity.Type) == "notification" && snsEntity.TopicArn != "" {
lp.createWrappedSNSSpan(snsEntity)
return
}
}

isSNS := strings.ToLower(snsEntity.Type) == "notification" && snsEntity.TopicArn != ""

if !isSNS {
return
// Check for EventBridge event wrapped by the SQS body
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Records[0].Body), &eventBridgeEvent); err == nil {
if len(eventBridgeEvent.Detail.TraceContext) > 0 {
lp.createWrappedEventBridgeSpan(eventBridgeEvent)
}
}
}

// sns span
// createWrappedSNSSpan creates an inferred span for SNS that is wrapped by SQS.
func (lp *LifecycleProcessor) createWrappedSNSSpan(snsEntity events.SNSEntity) {
lp.requestHandler.inferredSpans[1] = &inferredspan.InferredSpan{
CurrentInvocationStartTime: time.Unix(lp.requestHandler.inferredSpans[0].Span.Start, 0),
Span: &pb.Span{
Expand All @@ -179,9 +194,21 @@ func (lp *LifecycleProcessor) initFromSQSEvent(event events.SQSEvent) {
snsEvent.Records[0].SNS = snsEntity

lp.requestHandler.inferredSpans[1].EnrichInferredSpanWithSNSEvent(snsEvent)

lp.requestHandler.inferredSpans[1].Span.Duration = lp.GetInferredSpan().Span.Start - lp.requestHandler.inferredSpans[1].Span.Start
}

// createWrappedEventBridgeSpan creates an inferred span for EventBridge
// that is wrapped by SQS or SNS.
func (lp *LifecycleProcessor) createWrappedEventBridgeSpan(eventBridgeEvent events.EventBridgeEvent) {
lp.requestHandler.inferredSpans[1] = &inferredspan.InferredSpan{
CurrentInvocationStartTime: time.Unix(lp.requestHandler.inferredSpans[0].Span.Start, 0),
Span: &pb.Span{
SpanID: inferredspan.GenerateSpanId(),
},
}

lp.requestHandler.inferredSpans[1].EnrichInferredSpanWithEventBridgeEvent(eventBridgeEvent)
lp.requestHandler.inferredSpans[1].Span.Duration = lp.GetInferredSpan().Span.Start - lp.requestHandler.inferredSpans[1].Span.Start
}

func (lp *LifecycleProcessor) initFromLambdaFunctionURLEvent(event events.LambdaFunctionURLRequest, region string, accountID string, functionName string) {
Expand Down
82 changes: 82 additions & 0 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,88 @@ func TestTriggerTypesLifecycleEventForEventBridge(t *testing.T) {
}, testProcessor.GetTags())
}

func TestTriggerTypesLifecycleEventForEventBridgeSQS(t *testing.T) {
startInvocationTime := time.Now()
duration := 1 * time.Second
endInvocationTime := startInvocationTime.Add(duration)

var tracePayload *api.Payload

startDetails := &InvocationStartDetails{
InvokeEventRawPayload: getEventFromFile("eventbridgesqs.json"),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
StartTime: startInvocationTime,
}

testProcessor := &LifecycleProcessor{
DetectLambdaLibrary: func() bool { return false },
ProcessTrace: func(payload *api.Payload) { tracePayload = payload },
InferredSpansEnabled: true,
requestHandler: &RequestHandler{
executionInfo: &ExecutionStartInfo{
TraceID: 123,
SamplingPriority: 1,
},
},
}

testProcessor.OnInvokeStart(startDetails)
testProcessor.OnInvokeEnd(&InvocationEndDetails{
RequestID: "test-request-id",
EndTime: endInvocationTime,
IsError: false,
})

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 3, len(spans))
eventBridgeSpan, sqsSpan := spans[1], spans[2]
assert.Equal(t, "eventbridge", eventBridgeSpan.Service)
assert.Equal(t, "test-bus", eventBridgeSpan.Resource)
assert.Equal(t, "sqs", sqsSpan.Service)
assert.Equal(t, "test-queue", sqsSpan.Resource)
}

func TestTriggerTypesLifecycleEventForEventBridgeSNS(t *testing.T) {
startInvocationTime := time.Now()
duration := 1 * time.Second
endInvocationTime := startInvocationTime.Add(duration)

var tracePayload *api.Payload

startDetails := &InvocationStartDetails{
InvokeEventRawPayload: getEventFromFile("eventbridgesns.json"),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
StartTime: startInvocationTime,
}

testProcessor := &LifecycleProcessor{
DetectLambdaLibrary: func() bool { return false },
ProcessTrace: func(payload *api.Payload) { tracePayload = payload },
InferredSpansEnabled: true,
requestHandler: &RequestHandler{
executionInfo: &ExecutionStartInfo{
TraceID: 123,
SamplingPriority: 1,
},
},
}

testProcessor.OnInvokeStart(startDetails)
testProcessor.OnInvokeEnd(&InvocationEndDetails{
RequestID: "test-request-id",
EndTime: endInvocationTime,
IsError: false,
})

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 3, len(spans))
eventBridgeSpan, snsSpan := spans[1], spans[2]
assert.Equal(t, "eventbridge", eventBridgeSpan.Service)
assert.Equal(t, "test-bus", eventBridgeSpan.Resource)
assert.Equal(t, "sns", snsSpan.Service)
assert.Equal(t, "test-notifier", snsSpan.Resource)
}

// Helper function for reading test file
func getEventFromFile(filename string) []byte {
event, err := os.ReadFile("../trace/testdata/event_samples/" + filename)
Expand Down
28 changes: 24 additions & 4 deletions pkg/serverless/trace/propagation/carriers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

const (
awsTraceHeader = "AWSTraceHeader"
datadogSQSHeader = "_datadog"
awsTraceHeader = "AWSTraceHeader"
datadogTraceHeader = "_datadog"

rootPrefix = "Root="
parentPrefix = "Parent="
Expand Down Expand Up @@ -110,9 +110,20 @@ func extractTraceContextfromAWSTraceHeader(value string) (*TraceContext, error)
// sqsMessageCarrier returns the tracer.TextMapReader used to extract trace
// context from the events.SQSMessage type.
func sqsMessageCarrier(event events.SQSMessage) (tracer.TextMapReader, error) {
if attr, ok := event.MessageAttributes[datadogSQSHeader]; ok {
// Check if this is a normal SQS message
if attr, ok := event.MessageAttributes[datadogTraceHeader]; ok {
return sqsMessageAttrCarrier(attr)
}

// Check if this is an EventBridge event sent through SQS
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Body), &eventBridgeEvent); err == nil {
if len(eventBridgeEvent.Detail.TraceContext) > 0 {
return eventBridgeCarrier(eventBridgeEvent)
}
}

// Check if this is an SNS event sent through SQS
return snsSqsMessageCarrier(event)
}

Expand Down Expand Up @@ -164,7 +175,16 @@ func snsSqsMessageCarrier(event events.SQSMessage) (tracer.TextMapReader, error)
// snsEntityCarrier returns the tracer.TextMapReader used to extract trace
// context from the attributes of an events.SNSEntity type.
func snsEntityCarrier(event events.SNSEntity) (tracer.TextMapReader, error) {
msgAttrs, ok := event.MessageAttributes[datadogSQSHeader]
// Check if this is an EventBridge event sent through SNS
var eventBridgeEvent events.EventBridgeEvent
if err := json.Unmarshal([]byte(event.Message), &eventBridgeEvent); err == nil {
nhulston marked this conversation as resolved.
Show resolved Hide resolved
if len(eventBridgeEvent.Detail.TraceContext) > 0 {
return eventBridgeCarrier(eventBridgeEvent)
}
}

// If not, check if this is a regular SNS message with Datadog trace information
msgAttrs, ok := event.MessageAttributes[datadogTraceHeader]
if !ok {
return nil, errorNoDDContextFound
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/serverless/trace/propagation/carriers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ func TestSnsEntityCarrier(t *testing.T) {
expMap map[string]string
expErr string
}{
{
name: "eventbridge-through-sns",
event: events.SNSEntity{
Message: `{"detail":{"_datadog":{"x-datadog-trace-id":"123456789","x-datadog-parent-id":"987654321","x-datadog-sampling-priority":"1"}}}`,
},
expMap: map[string]string{
"x-datadog-trace-id": "123456789",
"x-datadog-parent-id": "987654321",
"x-datadog-sampling-priority": "1",
},
expErr: "",
},
{
name: "no-msg-attrs",
event: events.SNSEntity{},
Expand Down Expand Up @@ -689,6 +701,18 @@ func TestSqsMessageCarrier(t *testing.T) {
expMap: headersMapAll,
expErr: nil,
},
{
name: "eventbridge-through-sqs",
event: events.SQSMessage{
Body: `{"detail":{"_datadog":{"x-datadog-trace-id":"123456789","x-datadog-parent-id":"987654321","x-datadog-sampling-priority":"1"}}}`,
},
expMap: map[string]string{
"x-datadog-trace-id": "123456789",
"x-datadog-parent-id": "987654321",
"x-datadog-sampling-priority": "1",
},
expErr: nil,
},
}

for _, tc := range testcases {
Expand Down
17 changes: 17 additions & 0 deletions pkg/serverless/trace/testdata/event_samples/eventbridgesns.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"Records":[
{
"SNS":{
"MessageID":"12345678-90abc-def-1234-567890abcdef",
"Type":"Notification",
"TopicArn":"arn:aws:sns:us-east-1:123456789012:test-notifier",
"MessageAttributes":{

},
"Timestamp":"2024-09-16T19:44:01.713Z",
"Subject":"",
"Message":"{\"version\":\"0\",\"id\":\"12345678-90abc-def-1234-567890abcdef\",\"detail-type\":\"TestDetail\",\"source\":\"com.test.source\",\"account\":\"12345667890\",\"time\":\"2024-09-16T19:44:01Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"foo\":\"bar\",\"_datadog\":{\"x-datadog-trace-id\":\"12345\",\"x-datadog-parent-id\":\"67890\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-start-time\":\"1726515840997\",\"x-datadog-resource-name\":\"test-bus\",\"x-datadog-tags\":\"_dd.p.dm=-1,_dd.p.tid=123567890\"}}}"
}
}
]
}
19 changes: 19 additions & 0 deletions pkg/serverless/trace/testdata/event_samples/eventbridgesqs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"Records":[
{
"ReceiptHandle":"AQEB9RCmPUwKay0Fttcd7JEN1XPUwBq4ixSPWCQ5ne4x2r4SOQmyBy45h08wPSLe3ZXXXqjpAawK0J91O6wu/DsBHFZnYL2CIBbYhnZsYkwiO8XwsDQrf1ZSTTFH7eGwHuVQ2BsX7O+a9m+5THfXl6e7kBhfNTkATxstbr2iVRObgkvmiI9DdoBCsWBHqn8Z48j28ExS4Ov3i1olku6DcTnq6WxBGPMIYz3qX2LEnDFGNwnL6Ldzi/R4C7BJ8qMvsQeXFFAfGuWNjQsO6PKDhKo1eAEzozlcQd5sDtflIeMsNhfi3LusSPudncQ+zS9qUOWKgezKZqVBLbea4Mt1XIpe/e4WL2DVFfU5IE4cjsxrGEF9v2hcGelCrRexEqy+BVi0NLdwyO6R5L1GfU/1NJUVEE9o8wEqtC+0lrwG8xC6eS0=",
"Body":"{\"version\":\"0\",\"id\":\"103310e6-f267-750d-8cdd-6bee88ad2c9c\",\"detail-type\":\"TestDetail\",\"source\":\"com.test.source\",\"account\":\"12345\",\"time\":\"2024-09-16T19:00:27Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"foo\":\"bar\",\"_datadog\":{\"x-datadog-trace-id\":\"12345\",\"x-datadog-parent-id\":\"67890\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-start-time\":\"1726513226645\",\"x-datadog-resource-name\":\"test-bus\",\"x-datadog-tags\":\"_dd.p.dm=-1,_dd.p.tid=1234567800000000\"}}}",
"Attributes":{
"ApproximateReceiveCount":"1",
"SentTimestamp":"1726513227336",
"SenderId":"AIDAIOA2GYWSHW4E2VXIO",
"ApproximateFirstReceiveTimestamp":"1726513227350"
},
"MessageAttributes":{

},
"eventSource": "aws:sqs",
"EventSourceARN":"arn:aws:sqs:us-east-1:123456789012:test-queue"
}
]
}
1 change: 1 addition & 0 deletions pkg/serverless/trigger/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ type SNSEntity struct {
MessageAttributes map[string]interface{}
Timestamp time.Time
Subject string
Message string
}

// SQSEvent mirrors events.SQSEvent type, removing unused fields.
Expand Down
Loading