Skip to content

Commit

Permalink
fixup: reporter: support different profile origins
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Lehner <florian.lehner@elastic.co>
  • Loading branch information
florianl committed Dec 11, 2024
1 parent c1a56c3 commit 3af4c74
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 23 deletions.
47 changes: 34 additions & 13 deletions reporter/internal/pdata/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/samples"
"go.opentelemetry.io/ebpf-profiler/support"
)

const (
Expand All @@ -24,14 +25,15 @@ const (

// Generate generates a pdata request out of internal profiles data, to be
// exported.
func (p Pdata) Generate(events map[samples.TraceAndMetaKey]*samples.TraceEvents) pprofile.Profiles {
func (p Pdata) Generate(events map[int]samples.KeyToEventMapping) pprofile.Profiles {
profiles := pprofile.NewProfiles()
rp := profiles.ResourceProfiles().AppendEmpty()
sp := rp.ScopeProfiles().AppendEmpty()
prof := sp.Profiles().AppendEmpty()
prof.SetProfileID(pprofile.ProfileID(mkProfileID()))
p.setProfile(events, prof)

for _, origin := range []int{support.TraceOriginSampling, support.TraceOriginOffCPU} {
prof := sp.Profiles().AppendEmpty()
prof.SetProfileID(pprofile.ProfileID(mkProfileID()))
p.setProfile(origin, events[origin], prof)
}
return profiles
}

Expand All @@ -48,6 +50,7 @@ func mkProfileID() []byte {
// setProfile sets the data an OTLP profile with all collected samples up to
// this moment.
func (p *Pdata) setProfile(
origin int,
events map[samples.TraceAndMetaKey]*samples.TraceEvents,
profile pprofile.Profile,
) {
Expand All @@ -62,13 +65,23 @@ func (p *Pdata) setProfile(
funcMap[samples.FuncInfo{Name: "", FileName: ""}] = 0

st := profile.SampleType().AppendEmpty()
st.SetTypeStrindex(getStringMapIndex(stringMap, "samples"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "count"))

pt := profile.PeriodType()
pt.SetTypeStrindex(getStringMapIndex(stringMap, "cpu"))
pt.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))
profile.SetPeriod(1e9 / int64(p.samplesPerSecond))
switch origin {
case support.TraceOriginSampling:
st.SetTypeStrindex(getStringMapIndex(stringMap, "samples"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "count"))

pt := profile.PeriodType()
pt.SetTypeStrindex(getStringMapIndex(stringMap, "cpu"))
pt.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))

profile.SetPeriod(1e9 / int64(p.samplesPerSecond))
case support.TraceOriginOffCPU:
st.SetTypeStrindex(getStringMapIndex(stringMap, "events"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))
default:
log.Errorf("Generating profile for unsupported origin %d", origin)
return
}

// Temporary lookup to reference existing Mappings.
fileIDtoMapping := make(map[libpf.FileID]int32)
Expand All @@ -85,7 +98,15 @@ func (p *Pdata) setProfile(
endTS = pcommon.Timestamp(traceInfo.Timestamps[len(traceInfo.Timestamps)-1])

sample.TimestampsUnixNano().FromRaw(traceInfo.Timestamps)
sample.Value().Append(1)

switch origin {
case support.TraceOriginSampling:
sample.Value().Append(1)
case support.TraceOriginOffCPU:
for _, offTime := range traceInfo.OffTimes {
sample.Value().Append(int64(offTime))
}
}

// Walk every frame of the trace.
for i := range traceInfo.FrameTypes {
Expand Down
4 changes: 4 additions & 0 deletions reporter/internal/samples/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type TraceEvents struct {
MappingEnds []libpf.Address
MappingFileOffsets []uint64
Timestamps []uint64 // in nanoseconds
OffTimes []uint64 // in nanoseconds
}

// TraceAndMetaKey is the deduplication key for samples. This **must always**
Expand All @@ -45,6 +46,9 @@ type TraceAndMetaKey struct {
ExtraMeta any
}

// KeyToEventMapping supports temporary mapping traces to additional information.
type KeyToEventMapping map[TraceAndMetaKey]*TraceEvents

// AttrKeyValue is a helper to populate Profile.attribute_table.
type AttrKeyValue[T string | int64] struct {
Key string
Expand Down
40 changes: 30 additions & 10 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/pdata"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/samples"
"go.opentelemetry.io/ebpf-profiler/support"
)

// Assert that we implement the full Reporter interface.
Expand Down Expand Up @@ -61,7 +62,8 @@ type OTLPReporter struct {
pdata *pdata.Pdata

// traceEvents stores reported trace events (trace metadata with frames and counts)
traceEvents xsync.RWMutex[map[samples.TraceAndMetaKey]*samples.TraceEvents]
// from various origins.
traceEvents xsync.RWMutex[map[int]samples.KeyToEventMapping]

// pkgGRPCOperationTimeout sets the time limit for GRPC requests.
pkgGRPCOperationTimeout time.Duration
Expand Down Expand Up @@ -107,6 +109,12 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) {
return nil, err
}

originsMap := make(map[int]samples.KeyToEventMapping, 2)
for _, origin := range []int{support.TraceOriginSampling,
support.TraceOriginOffCPU} {
originsMap[origin] = make(samples.KeyToEventMapping)
}

return &OTLPReporter{
config: cfg,
name: cfg.Name,
Expand All @@ -123,10 +131,8 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) {
rpcStats: NewStatsHandler(),
pdata: data,
hostmetadata: hostmetadata,
traceEvents: xsync.NewRWMutex(
map[samples.TraceAndMetaKey]*samples.TraceEvents{},
),
cgroupv2ID: cgroupv2ID,
traceEvents: xsync.NewRWMutex(originsMap),
cgroupv2ID: cgroupv2ID,
}, nil
}

Expand All @@ -141,8 +147,11 @@ func (r *OTLPReporter) SupportsReportTraceEvent() bool { return true }

// ReportTraceEvent enqueues reported trace events for the OTLP reporter.
func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta) {
traceEventsMap := r.traceEvents.WLock()
defer r.traceEvents.WUnlock(&traceEventsMap)
if meta.Origin != support.TraceOriginSampling && meta.Origin != support.TraceOriginOffCPU {
// At the moment only on-CPU and off-CPU traces are reported.
log.Errorf("Skip reporting trace for unexpected %d origin", meta.Origin)
return
}

var extraMeta any
if r.config.ExtraSampleAttrProd != nil {
Expand All @@ -165,20 +174,25 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta
ExtraMeta: extraMeta,
}

if events, exists := (*traceEventsMap)[key]; exists {
traceEventsMap := r.traceEvents.WLock()
defer r.traceEvents.WUnlock(&traceEventsMap)

if events, exists := (*traceEventsMap)[meta.Origin][key]; exists {
events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp))
(*traceEventsMap)[key] = events
events.OffTimes = append(events.OffTimes, meta.OffTime)
(*traceEventsMap)[meta.Origin][key] = events
return
}

(*traceEventsMap)[key] = &samples.TraceEvents{
(*traceEventsMap)[meta.Origin][key] = &samples.TraceEvents{
Files: trace.Files,
Linenos: trace.Linenos,
FrameTypes: trace.FrameTypes,
MappingStarts: trace.MappingStart,
MappingEnds: trace.MappingEnd,
MappingFileOffsets: trace.MappingFileOffsets,
Timestamps: []uint64{uint64(meta.Timestamp)},
OffTimes: []uint64{meta.OffTime},
}
}

Expand Down Expand Up @@ -342,7 +356,13 @@ func (r *OTLPReporter) Start(ctx context.Context) error {
func (r *OTLPReporter) reportOTLPProfile(ctx context.Context) error {
traceEvents := r.traceEvents.WLock()
events := maps.Clone(*traceEvents)
originsMap := make(map[int]samples.KeyToEventMapping, 2)
clear(*traceEvents)
for _, origin := range []int{support.TraceOriginSampling,
support.TraceOriginOffCPU} {
originsMap[origin] = make(samples.KeyToEventMapping)
}
*traceEvents = originsMap
r.traceEvents.WUnlock(&traceEvents)

profiles := r.pdata.Generate(events)
Expand Down

0 comments on commit 3af4c74

Please sign in to comment.