Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SVLS-4142] Create a Lambda span on timeouts #21481

Merged
merged 25 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
381f812
create a Lambda span on timeouts
DylanLovesCoffee Dec 11, 2023
831dab7
don't create a cold start span when the runtime restarts during timeouts
DylanLovesCoffee Dec 11, 2023
45c8d83
Merge branch 'main' into dylan/timeout-trace
DylanLovesCoffee Dec 11, 2023
2200b76
fix linting
DylanLovesCoffee Dec 11, 2023
a9ec6a7
fix test
DylanLovesCoffee Dec 11, 2023
71ee55f
lint: rename name variables
DylanLovesCoffee Dec 11, 2023
eb04b12
lint again
DylanLovesCoffee Dec 11, 2023
68632e3
small fixes
DylanLovesCoffee Dec 13, 2023
6c6ce1c
refactor timeout span logic
DylanLovesCoffee Dec 21, 2023
7299681
Merge branch 'main' into dylan/timeout-trace
DylanLovesCoffee Dec 21, 2023
bf23790
add mutexes
DylanLovesCoffee Jan 2, 2024
3cc6e87
Merge branch 'main' into dylan/timeout-trace
DylanLovesCoffee Jan 5, 2024
3f9bf0d
Merge branch 'main' into dylan/timeout-trace
DylanLovesCoffee Jan 8, 2024
65e7c20
fix span completed check
DylanLovesCoffee Jan 8, 2024
f1e2857
revert refactor
DylanLovesCoffee Jan 8, 2024
12eb913
remove cold start span changes
DylanLovesCoffee Jan 9, 2024
4f0543f
use mutex over rwmutex
DylanLovesCoffee Jan 9, 2024
24712f0
test routes
DylanLovesCoffee Jan 9, 2024
05a6392
add comment + update tests
DylanLovesCoffee Jan 10, 2024
2b48fe1
test endExecutionSpan
DylanLovesCoffee Jan 10, 2024
5d27996
add serverless.go test
DylanLovesCoffee Jan 10, 2024
70cbf83
add test /hello for route
DylanLovesCoffee Jan 10, 2024
58516db
Merge branch 'main' into dylan/timeout-trace
DylanLovesCoffee Apr 4, 2024
5292f39
only set span incomplete when /startInvocation has been hit
DylanLovesCoffee Apr 8, 2024
7148b7d
time out -> timeout
DylanLovesCoffee Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/serverless/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func runAgent() {
TraceAgent: serverlessDaemon.TraceAgent,
StopChan: make(chan struct{}),
ColdStartSpanId: coldStartSpanId,
WasColdStart: serverlessDaemon.ExecutionContext.GetCurrentState().WasColdStart,
}

log.Debug("Starting ColdStartSpanCreator")
Expand Down
5 changes: 5 additions & 0 deletions pkg/serverless/appsec/httpsec/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func (lp *ProxyLifecycleProcessor) GetExecutionInfo() *invocationlifecycle.Execu
return nil // not used in the runtime api proxy case
}

// OnTimeoutInvokeEnd completes an unfinished execution span during a timeout
func (lp *ProxyLifecycleProcessor) OnTimeoutInvokeEnd(*invocationlifecycle.TimeoutExecutionInfo) {
// not used in the runtime api proxy case
}

// OnInvokeStart is the hook triggered when an invocation has started
func (lp *ProxyLifecycleProcessor) OnInvokeStart(startDetails *invocationlifecycle.InvocationStartDetails) {
log.Debugf("appsec: proxy-lifecycle: invocation started with raw payload `%s`", startDetails.InvokeEventRawPayload)
Expand Down
11 changes: 11 additions & 0 deletions pkg/serverless/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ type Daemon struct {
// LambdaLibraryDetected represents whether the Datadog Lambda Library was detected in the environment
LambdaLibraryDetected bool

// hitOnStart indicates whether the the serverless.StartInvocation route has been hit
hitOnStart bool

// hitOnEnd indicates whether the the serverless.EndInvocation route has been hit
hitOnEnd bool

// runtimeStateMutex is used to ensure that modifying the state of the runtime is thread-safe
runtimeStateMutex sync.Mutex

Expand Down Expand Up @@ -435,3 +441,8 @@ func (d *Daemon) setTraceTags(tagMap map[string]string) bool {
}
return false
}

// IsExecutionSpanComplete checks if the execution span was finished during a timeout
func (d *Daemon) IsExecutionSpanComplete() bool {
return !d.LambdaLibraryDetected && d.hitOnStart && d.hitOnEnd
DylanLovesCoffee marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 2 additions & 0 deletions pkg/serverless/daemon/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type StartInvocation struct {

func (s *StartInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Debug("Hit on the serverless.StartInvocation route.")
s.daemon.hitOnStart = true
DylanLovesCoffee marked this conversation as resolved.
Show resolved Hide resolved
startTime := time.Now()
reqBody, err := io.ReadAll(r.Body)
if err != nil {
Expand Down Expand Up @@ -86,6 +87,7 @@ type EndInvocation struct {

func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Debug("Hit on the serverless.EndInvocation route.")
e.daemon.hitOnEnd = true
purple4reina marked this conversation as resolved.
Show resolved Hide resolved
DylanLovesCoffee marked this conversation as resolved.
Show resolved Hide resolved
endTime := time.Now()
ecs := e.daemon.ExecutionContext.GetCurrentState()
coldStartTags := e.daemon.ExecutionContext.GetColdStartTagsForRequestID(ecs.LastRequestID)
Expand Down
44 changes: 44 additions & 0 deletions pkg/serverless/daemon/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func (m *mockLifecycleProcessor) OnInvokeEnd(endDetails *invocationlifecycle.Inv
m.lastEndDetails = endDetails
}

func (m *mockLifecycleProcessor) OnTimeoutInvokeEnd(*invocationlifecycle.TimeoutExecutionInfo) {
m.OnInvokeEndCalled = false
m.isError = true
m.lastEndDetails = nil
}

func TestStartInvocation(t *testing.T) {
assert := assert.New(t)
port := testutil.FreeTCPPort(t)
Expand Down Expand Up @@ -127,6 +133,44 @@ func TestEndInvocationWithError(t *testing.T) {
assert.True(m.isError)
}

func TestTimeoutInvocation(t *testing.T) {
assert := assert.New(t)
port := testutil.FreeTCPPort(t)
d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port))
time.Sleep(100 * time.Millisecond)
defer d.Stop()

m := &mockLifecycleProcessor{}
d.InvocationProcessor = m
client := &http.Client{Timeout: 1 * time.Second}
startURL := fmt.Sprintf("http://127.0.0.1:%d/lambda/start-invocation", port)

body := bytes.NewBuffer([]byte(`{}`))
startReq, err := http.NewRequest(http.MethodPost, startURL, body)
assert.Nil(err)
res, err := client.Do(startReq)
assert.Nil(err)
if res != nil {
assert.Equal(res.StatusCode, 200)
res.Body.Close()
}

d.InvocationProcessor.OnTimeoutInvokeEnd(
&invocationlifecycle.TimeoutExecutionInfo{
RequestID: "123abc",
Runtime: "custom",
IsColdStart: true,
IsProactiveInit: true,
})

assert.True(m.OnInvokeStartCalled)
assert.True(d.hitOnStart)
assert.False(m.OnInvokeEndCalled)
assert.False(d.hitOnEnd)
assert.True(m.isError)
assert.Nil(m.lastEndDetails)
}

func TestTraceContext(t *testing.T) {
assert := assert.New(t)

Expand Down
1 change: 1 addition & 0 deletions pkg/serverless/executioncontext/executioncontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (ec *ExecutionContext) RestoreCurrentStateFromFile() error {
ec.lastLogRequestID = restoredExecutionContextState.LastLogRequestID
ec.lastOOMRequestID = restoredExecutionContextState.LastOOMRequestID
ec.coldstartRequestID = restoredExecutionContextState.ColdstartRequestID
ec.wasColdStart = restoredExecutionContextState.WasColdStart
DylanLovesCoffee marked this conversation as resolved.
Show resolved Hide resolved
ec.startTime = restoredExecutionContextState.StartTime
ec.endTime = restoredExecutionContextState.EndTime
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/serverless/invocationlifecycle/invocation_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type InvocationProcessor interface {
OnInvokeEnd(endDetails *InvocationEndDetails)
// GetExecutionInfo returns the current execution start information
GetExecutionInfo() *ExecutionStartInfo
// OnTimeoutInvokeEnd completes an unfinished execution span during a timeout
OnTimeoutInvokeEnd(timeoutCtx *TimeoutExecutionInfo)
}

// InvocationSubProcessor is the interface to implement to receive invocation lifecycle hooks along with the
Expand Down
61 changes: 40 additions & 21 deletions pkg/serverless/invocationlifecycle/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,27 +281,7 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) {
spans = append(spans, span)

if lp.InferredSpansEnabled {
log.Debug("[lifecycle] Attempting to complete the inferred span")
log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span)
if lp.GetInferredSpan().Span.Start != 0 {
span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1]
if span1 != nil {
log.Debug("[lifecycle] Completing a secondary inferred span")
lp.setParentIDForMultipleInferredSpans()
span1.AddTagToInferredSpan("http.status_code", statusCode)
span1.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), endDetails.IsError)
spans = append(spans, span)
log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1])
}
span0.AddTagToInferredSpan("http.status_code", statusCode)
span0.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span0, endDetails.EndTime, endDetails.IsError)
spans = append(spans, span)
log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan())
} else {
log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data")
}
spans = lp.endInferredSpan(spans, statusCode, endDetails.EndTime, endDetails.IsError)
DylanLovesCoffee marked this conversation as resolved.
Show resolved Hide resolved
}
lp.processTrace(spans)
}
Expand All @@ -313,6 +293,19 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) {
}
}

// OnTimeoutInvokeEnd completes an unfinished execution span during a timeout
func (lp *LifecycleProcessor) OnTimeoutInvokeEnd(timeoutContext *TimeoutExecutionInfo) {
spans := make([]*pb.Span, 0, 3)
span := lp.endExecutionSpanOnTimeout(timeoutContext)
spans = append(spans, span)

if lp.InferredSpansEnabled {
// No response status code can be retrieved in a timeout so we pass an empty string
spans = lp.endInferredSpan(spans, "", time.Now(), true)
}
lp.processTrace(spans)
}

// GetTags returns the tagset of the currently executing lambda function
func (lp *LifecycleProcessor) GetTags() map[string]string {
return lp.requestHandler.triggerTags
Expand Down Expand Up @@ -385,3 +378,29 @@ func (lp *LifecycleProcessor) setParentIDForMultipleInferredSpans() {
lp.requestHandler.inferredSpans[1].Span.ParentID = lp.requestHandler.inferredSpans[0].Span.ParentID
lp.requestHandler.inferredSpans[0].Span.ParentID = lp.requestHandler.inferredSpans[1].Span.SpanID
}

// endInferredSpan attempts to complete any inferred spans and send them to intake
func (lp *LifecycleProcessor) endInferredSpan(spans []*pb.Span, statusCode string, endTime time.Time, isError bool) []*pb.Span {
log.Debug("[lifecycle] Attempting to complete the inferred span")
log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span)
if lp.GetInferredSpan().Span.Start != 0 {
span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1]
if span1 != nil {
log.Debug("[lifecycle] Completing a secondary inferred span")
lp.setParentIDForMultipleInferredSpans()
span1.AddTagToInferredSpan("http.status_code", statusCode)
span1.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), isError)
spans = append(spans, span)
log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1])
}
span0.AddTagToInferredSpan("http.status_code", statusCode)
span0.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span0, endTime, isError)
spans = append(spans, span)
log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan())
} else {
log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data")
}
return spans
}
103 changes: 103 additions & 0 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,109 @@ func TestCompleteInferredSpanWithOutStartTime(t *testing.T) {
completedInferredSpan := tracePayload.TracerPayload.Chunks[0].Spans[0]
assert.Equal(t, startInvocationTime.UnixNano(), completedInferredSpan.Start)
}

func TestTimeoutExecutionSpan(t *testing.T) {
t.Setenv(functionNameEnvVar, "my-function")
t.Setenv("DD_SERVICE", "mock-lambda-service")

extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
log := fxutil.Test[log.Component](t, logimpl.MockModule())
demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour)
mockDetectLambdaLibrary := func() bool { return false }

var tracePayload *api.Payload
mockProcessTrace := func(payload *api.Payload) {
tracePayload = payload
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
InferredSpansEnabled: true,
}
startDetails := InvocationStartDetails{
StartTime: time.Now(),
InvokeEventRawPayload: []byte(`{}`),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}
testProcessor.OnInvokeStart(&startDetails)

timeoutCtx := &TimeoutExecutionInfo{
RequestID: "test-request-id",
Runtime: "java11",
IsColdStart: false,
IsProactiveInit: false,
}
testProcessor.OnTimeoutInvokeEnd(timeoutCtx)

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 1, len(spans))
// No trace context passed
assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID)
assert.Equal(t, tracePayload.TracerPayload.Chunks[0].Priority, int32(-128))
// New trace ID and span ID has been created
assert.NotEqual(t, uint64(0), spans[0].TraceID)
assert.NotEqual(t, uint64(0), spans[0].SpanID)
assert.Equal(t, spans[0].Error, int32(1))
assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id")
assert.Equal(t, spans[0].GetMeta()["language"], "java")
}

func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) {
t.Setenv(functionNameEnvVar, "my-function")
t.Setenv("DD_SERVICE", "mock-lambda-service")

extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
log := fxutil.Test[log.Component](t, logimpl.MockModule())
demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour)
mockDetectLambdaLibrary := func() bool { return false }

var tracePayload *api.Payload
mockProcessTrace := func(payload *api.Payload) {
tracePayload = payload
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
InferredSpansEnabled: true,
}
eventPayload := `a5a{"resource":"/users/create","path":"/users/create","httpMethod":"GET","headers":{"Accept":"*/*","Accept-Encoding":"gzip","x-datadog-parent-id":"1480558859903409531","x-datadog-sampling-priority":"1","x-datadog-trace-id":"5736943178450432258"}}0`
startDetails := InvocationStartDetails{
StartTime: time.Now(),
InvokeEventRawPayload: []byte(eventPayload),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}
testProcessor.OnInvokeStart(&startDetails)

timeoutCtx := &TimeoutExecutionInfo{
RequestID: "test-request-id",
Runtime: "java11",
IsColdStart: false,
IsProactiveInit: false,
}
testProcessor.OnTimeoutInvokeEnd(timeoutCtx)

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 1, len(spans))
// Trace context received
assert.Equal(t, spans[0].GetTraceID(), testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, spans[0].GetParentID(), testProcessor.GetExecutionInfo().parentID)
assert.Equal(t, tracePayload.TracerPayload.Chunks[0].Priority, int32(testProcessor.GetExecutionInfo().SamplingPriority))
assert.Equal(t, spans[0].Error, int32(1))
assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id")
assert.Equal(t, spans[0].GetMeta()["language"], "java")
}

func TestTriggerTypesLifecycleEventForAPIGatewayRest(t *testing.T) {
startDetails := &InvocationStartDetails{
InvokeEventRawPayload: getEventFromFile("api-gateway.json"),
Expand Down
Loading
Loading