Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delayed processing for ProcessManager.pidToProcessInfo #321

Merged
merged 6 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions processmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
eim "go.opentelemetry.io/ebpf-profiler/processmanager/execinfomanager"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/tracehandler"
"go.opentelemetry.io/ebpf-profiler/traceutil"
"go.opentelemetry.io/ebpf-profiler/util"
)
Expand Down Expand Up @@ -348,34 +347,6 @@ func (pm *ProcessManager) MaybeNotifyAPMAgent(
return serviceName
}

func (pm *ProcessManager) SymbolizationComplete(traceCaptureKTime times.KTime) {
Copy link
Member Author

@christos68k christos68k Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to processinfo.go for consistency (all pidToProcessInfo accessors in one place), renamed to ProcessedUntil and updated to also cleanup pidToProcessInfo.

pm.mu.Lock()
defer pm.mu.Unlock()

nowKTime := times.GetKTime()
log.Debugf("SymbolizationComplete captureKT: %v latency: %v ms",
traceCaptureKTime, (nowKTime-traceCaptureKTime)/1e6)

for pid, pidExitKTime := range pm.exitEvents {
if pidExitKTime > traceCaptureKTime {
continue
}
for _, instance := range pm.interpreters[pid] {
if err := instance.Detach(pm.ebpf, pid); err != nil {
log.Errorf("Failed to handle interpreted process exit for PID %d: %v",
pid, err)
}
}
delete(pm.interpreters, pid)
delete(pm.exitEvents, pid)

log.Debugf("PID %v exit latency %v ms", pid, (nowKTime-pidExitKTime)/1e6)
}
}

// Compile time check to make sure we satisfy the interface.
var _ tracehandler.TraceProcessor = (*ProcessManager)(nil)

// AddSynthIntervalData adds synthetic stack deltas to the manager. This is useful for cases where
// populating the information via the stack delta provider isn't viable, for example because the
// `.eh_frame` section for a binary is broken. If `AddSynthIntervalData` was called for a given
Expand Down
2 changes: 1 addition & 1 deletion processmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func TestProcExit(t *testing.T) {

populateManager(t, manager)

_ = manager.ProcessPIDExit(testcase.pid)
manager.ProcessPIDExit(testcase.pid)
assert.Equal(t, testcase.deletePidPageMappingCount,
ebpfMockup.deletePidPageMappingCount)
assert.Equal(t, testcase.deleteStackDeltaRangesCount,
Expand Down
67 changes: 50 additions & 17 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/tpbase"
"go.opentelemetry.io/ebpf-profiler/tracehandler"
"go.opentelemetry.io/ebpf-profiler/util"
)

Expand Down Expand Up @@ -506,32 +507,36 @@ func (pm *ProcessManager) synchronizeMappings(pr process.Process,
return newProcess
}

// ProcessPIDExit informs the ProcessManager that a process exited and no longer will be scheduled
// for processing. It also schedules immediate symbolization if the exited PID needs it. exitKTime
// is stored for later processing in SymbolizationComplete when all traces have been collected.
// There can be a race condition if we can not clean up the references for this process
// ProcessPIDExit informs the ProcessManager that a process exited and no longer will be scheduled.
// exitKTime is stored for later processing in ProcessedUntil, when traces up to this time have been
// processed. There can be a race condition if we can not clean up the references for this process
// fast enough and this particular pid is reused again by the system.
// NOTE: Exported only for tracer.
func (pm *ProcessManager) ProcessPIDExit(pid libpf.PID) bool {
func (pm *ProcessManager) ProcessPIDExit(pid libpf.PID) {
exitKTime := times.GetKTime()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this outside the lock for improved accuracy (there's a debug log in ProcessedUntil that prints exit latency).

log.Debugf("- PID: %v", pid)
defer pm.ebpf.RemoveReportedPID(pid)

pm.mu.Lock()
defer pm.mu.Unlock()
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved

symbolize := false
exitKTime := times.GetKTime()
if pm.interpreterTracerEnabled {
if len(pm.interpreters[pid]) > 0 {
pidExited := false
info, pidExists := pm.pidToProcessInfo[pid]
if pidExists || (pm.interpreterTracerEnabled &&
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially same logic as before with these additions:

  1. Don't add exitKTime to pm.exitEvents if it already exists.
  2. Also add exitKTime to pm.exitEvents if pm.pidToProcessInfo[pid] exists, as we want to cleanup the latter in delayed fashion.

len(pm.interpreters[pid]) > 0) {
// ProcessPIDExit may be called multiple times in short succession
// for the same PID, don't update exitKTime if we've previously recorded it.
if _, pidExited = pm.exitEvents[pid]; !pidExited {
pm.exitEvents[pid] = exitKTime
symbolize = true
}
}

info, ok := pm.pidToProcessInfo[pid]
if !ok {
if !pidExists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep the global read & write lock as short as possible, the if !pidExists {..} part should be moved before if pidExists || (pm.interpreterTracerEnabled && len(pm.interpreters[pid]) > 0) {..}.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would prevent executing if _, pidExited = ... in case (pm.interpreterTracerEnabled && len(pm.interpreters[pid]) > 0 is true.

Copy link
Member Author

@christos68k christos68k Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Tim wrote, this would alter the logic. I tried to keep as much of the original semantics the same to avoid introducing new races. Maybe here it's possible to safely say that if !pidExists then it's OK not to write exitKTime in pm.exitEvents but we'd need to carefully examine all subsystem interactions, check for race conditions etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up is done in #325

log.Debugf("Skip process exit handling for unknown PID %d", pid)
return symbolize
return
}
if pidExited {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to attempt a repeat cleanup for the same PID, if we've previously performed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, pidExited should be renamed to pidExitProcessed so something similar, this would it make obvious, that we want to avoid duplicate work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

log.Debugf("Skip duplicate process exit handling for PID %d", pid)
return
}

// Delete all entries we have for this particular PID from pid_page_to_mapping_info.
Copy link
Member Author

@christos68k christos68k Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this cleanup here as there's no immediate need to postpone cleaning up the eBPF map until traceCaptureKTime >= pidExitKtime (unlike pidToProcessInfo). This also speeds up execution of ProcessedUntil compared to having the map cleanup take place there.

Expand All @@ -548,9 +553,6 @@ func (pm *ProcessManager) ProcessPIDExit(pid libpf.PID) bool {
address, pid, err)
}
}
delete(pm.pidToProcessInfo, pid)
Copy link
Member Author

@christos68k christos68k Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now taking place in ProcessedUntil, delayed until traceCaptureKTime >= exitKTime.


return symbolize
}

func (pm *ProcessManager) SynchronizeProcess(pr process.Process) {
Expand Down Expand Up @@ -670,3 +672,34 @@ func (pm *ProcessManager) ExePathForPID(pid libpf.PID) string {
}
return executable
}

func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) {
pm.mu.Lock()
defer pm.mu.Unlock()

nowKTime := times.GetKTime()
log.Debugf("ProcessedUntil captureKT: %v latency: %v ms",
traceCaptureKTime, (nowKTime-traceCaptureKTime)/1e6)

Comment on lines +704 to +710
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the lock holding as short as possible:

Suggested change
pm.mu.Lock()
defer pm.mu.Unlock()
nowKTime := times.GetKTime()
log.Debugf("ProcessedUntil captureKT: %v latency: %v ms",
traceCaptureKTime, (nowKTime-traceCaptureKTime)/1e6)
nowKTime := times.GetKTime()
log.Debugf("ProcessedUntil captureKT: %v latency: %v ms",
traceCaptureKTime, (nowKTime-traceCaptureKTime)/1e6)
pm.mu.Lock()
defer pm.mu.Unlock()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can affect the latency measurement, since we're timing before the lock.

for pid, pidExitKTime := range pm.exitEvents {
if pidExitKTime > traceCaptureKTime {
continue
}

delete(pm.pidToProcessInfo, pid)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same logic as before with this single-line addition.


for _, instance := range pm.interpreters[pid] {
if err := instance.Detach(pm.ebpf, pid); err != nil {
log.Errorf("Failed to handle interpreted process exit for PID %d: %v",
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved
pid, err)
}
}
delete(pm.interpreters, pid)
delete(pm.exitEvents, pid)

log.Debugf("PID %v exit latency %v ms", pid, (nowKTime-pidExitKTime)/1e6)
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Compile time check to make sure we satisfy the interface.
var _ tracehandler.TraceProcessor = (*ProcessManager)(nil)
6 changes: 3 additions & 3 deletions tracehandler/tracehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type TraceProcessor interface {
// the frame and send the associated metadata to the collection agent.
ConvertTrace(trace *host.Trace) *libpf.Trace

// SymbolizationComplete is called after a group of Trace has been symbolized.
// ProcessedUntil is called periodically after Traces are processed/symbolized.
// It gets the timestamp of when the Traces (if any) were captured. The timestamp
// is in essence an indicator that all Traces until that time have been now processed,
// and any events up to this time can be processed.
SymbolizationComplete(traceCaptureKTime times.KTime)
// and any events and cleanup actions up to this time can be processed.
ProcessedUntil(traceCaptureKTime times.KTime)
}

// traceHandler provides functions for handling new traces and trace count updates
Expand Down
2 changes: 1 addition & 1 deletion tracehandler/tracehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (f *fakeTraceProcessor) ConvertTrace(trace *host.Trace) *libpf.Trace {
return &newTrace
}

func (f *fakeTraceProcessor) SymbolizationComplete(times.KTime) {}
func (f *fakeTraceProcessor) ProcessedUntil(times.KTime) {}

func (f *fakeTraceProcessor) MaybeNotifyAPMAgent(*host.Trace, libpf.TraceHash, uint16) string {
return ""
Expand Down
10 changes: 5 additions & 5 deletions tracer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
traceOutChan <- trace
}
// After we've received and processed all trace events, call
// SymbolizationComplete if there is a pending oldKTime that we
// ProcessedUntil if there is a pending oldKTime that we
// haven't yet propagated to the rest of the agent.
// This introduces both an upper bound to SymbolizationComplete
// This introduces both an upper bound to ProcessedUntil
// call frequency (dictated by pollTicker) but also skips calls
// when none are needed (e.g. no trace events have been read).
//
Expand All @@ -206,18 +206,18 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
// timestamps t0 < t1 < t2 < t3, this poll loop reads [t3 t1 t2]
// in a first iteration and [t0] in a second iteration. If we use
// the current iteration minKTime we'll call
// SymbolizationComplete(t1) first and t0 next, with t0 < t1.
// ProcessedUntil(t1) first and t0 next, with t0 < t1.
if oldKTime > 0 {
// Ensure that all previously sent trace events have been processed
traceOutChan <- nil

if minKTime > 0 && minKTime <= oldKTime {
// If minKTime is smaller than oldKTime, use it and reset it
// to avoid a repeat during next iteration.
t.TraceProcessor().SymbolizationComplete(minKTime)
t.TraceProcessor().ProcessedUntil(minKTime)
minKTime = 0
} else {
t.TraceProcessor().SymbolizationComplete(oldKTime)
t.TraceProcessor().ProcessedUntil(oldKTime)
}
}
oldKTime = minKTime
Expand Down
Loading