Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ebpf): merge equal samples #2788

Merged
merged 2 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bin
/phlare
/pyroscope
/profilecli
/playground
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
Expand Down
9 changes: 6 additions & 3 deletions ebpf/cmd/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
)

func main() {

config = getConfig()
metrics = ebpfmetrics.New(prometheus.DefaultRegisterer)

Expand Down Expand Up @@ -78,10 +77,14 @@ func main() {

func collectProfiles(profiles chan *pushv1.PushRequest) {
builders := pprof.NewProfileBuilders(int64(config.SampleRate))
err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32) {
err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation ebpfspy.SampleAggregation) {
labelsHash, labels := target.Labels()
builder := builders.BuilderForTarget(labelsHash, labels)
builder.AddSample(stack, value)
if aggregation == ebpfspy.SampleAggregated {
builder.CreateSample(stack, value)
} else {
builder.CreateSampleOrAddValue(stack, value)
}
})

if err != nil {
Expand Down
76 changes: 68 additions & 8 deletions ebpf/pprof/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package pprof
import (
"fmt"
"io"
"reflect"
"sync"
"time"
"unsafe"

"github.com/cespare/xxhash/v2"
"github.com/google/pprof/profile"
"github.com/klauspost/compress/gzip"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -39,9 +42,10 @@ func (b ProfileBuilders) BuilderForTarget(hash uint64, labels labels.Labels) *Pr
}

builder := &ProfileBuilder{
locations: make(map[string]*profile.Location),
functions: make(map[string]*profile.Function),
Labels: labels,
locations: make(map[string]*profile.Location),
functions: make(map[string]*profile.Function),
sampleHashToSample: make(map[uint64]*profile.Sample),
Labels: labels,
Profile: &profile.Profile{
Mapping: []*profile.Mapping{
{
Expand All @@ -53,20 +57,28 @@ func (b ProfileBuilders) BuilderForTarget(hash uint64, labels labels.Labels) *Pr
PeriodType: &profile.ValueType{Type: "cpu", Unit: "nanoseconds"},
TimeNanos: time.Now().UnixNano(),
},
hash: xxhash.New(),
tmpLocationIDs: make([]uint64, 0, 128),
}
res = builder
b.Builders[hash] = res
return res
}

type ProfileBuilder struct {
locations map[string]*profile.Location
functions map[string]*profile.Function
Profile *profile.Profile
Labels labels.Labels
locations map[string]*profile.Location
functions map[string]*profile.Function
sampleHashToSample map[uint64]*profile.Sample
Profile *profile.Profile
Labels labels.Labels

hash *xxhash.Digest
b [8]byte
tmpLocations []*profile.Location
tmpLocationIDs []uint64
}

func (p *ProfileBuilder) AddSample(stacktrace []string, value uint64) {
func (p *ProfileBuilder) CreateSample(stacktrace []string, value uint64) {
sample := &profile.Sample{
Value: []int64{int64(value) * p.Profile.Period},
}
Expand All @@ -77,6 +89,42 @@ func (p *ProfileBuilder) AddSample(stacktrace []string, value uint64) {
p.Profile.Sample = append(p.Profile.Sample, sample)
}

func (p *ProfileBuilder) CreateSampleOrAddValue(stacktrace []string, value uint64) {
scaledValue := int64(value) * p.Profile.Period
if cap(p.tmpLocations) < len(stacktrace) {
p.tmpLocations = make([]*profile.Location, 0, len(stacktrace))
} else {
p.tmpLocations = p.tmpLocations[:0]
}
if cap(p.tmpLocationIDs) < len(stacktrace) {
p.tmpLocationIDs = make([]uint64, 0, len(stacktrace))
} else {
p.tmpLocationIDs = p.tmpLocationIDs[:0]
}
for _, s := range stacktrace {
loc := p.addLocation(s)
p.tmpLocations = append(p.tmpLocations, loc)
p.tmpLocationIDs = append(p.tmpLocationIDs, loc.ID)
}
p.hash.Reset()
if _, err := p.hash.Write(uint64Bytes(p.tmpLocationIDs)); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to Write? Is there a way to Sum64([]byte)?

panic(err)
}
h := p.hash.Sum64()
sample := p.sampleHashToSample[h]
if sample != nil {
sample.Value[0] += scaledValue
return
}
sample = &profile.Sample{
Location: p.tmpLocations,
Value: []int64{scaledValue},
}
p.sampleHashToSample[h] = sample
p.tmpLocations = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be a leftover?

p.Profile.Sample = append(p.Profile.Sample, sample)
}

func (p *ProfileBuilder) addLocation(function string) *profile.Location {
loc, ok := p.locations[function]
if ok {
Expand Down Expand Up @@ -131,3 +179,15 @@ func (p *ProfileBuilder) Write(dst io.Writer) (int64, error) {
}
return 0, nil
}

func uint64Bytes(s []uint64) []byte {
if len(s) == 0 {
return nil
}
var bs []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&bs))
hdr.Len = len(s) * 8
hdr.Cap = hdr.Len
hdr.Data = uintptr(unsafe.Pointer(&s[0]))
return bs
}
73 changes: 62 additions & 11 deletions ebpf/pprof/pprof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package pprof

import (
"bytes"
"fmt"
"strings"
"testing"
"time"

"github.com/google/pprof/profile"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -17,22 +20,72 @@ func TestBackAndForth(t *testing.T) {
builders := NewProfileBuilders(97)

builder := builders.BuilderForTarget(1, labels.Labels{{Name: "foo", Value: "bar"}})
builder.AddSample([]string{"a", "b", "c"}, 239)
builder.AddSample([]string{"a", "b", "d"}, 4242)
builder.CreateSample([]string{"a", "b", "c"}, 239)
builder.CreateSample([]string{"a", "b", "d"}, 4242)

buf := bytes.NewBuffer(nil)
_, err := builder.Write(buf)
require.NoError(t, err)
assert.NoError(t, err)

rawProfile := buf.Bytes()

parsed, err := profile.Parse(bytes.NewBuffer(rawProfile))
require.NoError(t, err)
assert.NoError(t, err)
require.NotNil(t, parsed)
require.Equal(t, 2, len(parsed.Sample))
require.Equal(t, 4, len(parsed.Function))
require.Equal(t, 4, len(parsed.Location))
assert.Equal(t, 2, len(parsed.Sample))
assert.Equal(t, 4, len(parsed.Function))
assert.Equal(t, 4, len(parsed.Location))

stacks := stackCollapse(parsed)

assert.Equal(t, 239*period, stacks["a;b;c"])
assert.Equal(t, 4242*period, stacks["a;b;d"])
}

func TestMergeSamples(t *testing.T) {
const sampleRate = 97
period := time.Second.Nanoseconds() / int64(sampleRate)

builders := NewProfileBuilders(97)

builder := builders.BuilderForTarget(1, nil)
builder.CreateSampleOrAddValue([]string{"a", "b", "d"}, 4242)

for i := 0; i < 14; i++ {
builder.CreateSampleOrAddValue([]string{"a", "b", "c"}, 239)
}

var longStack []string
for i := 0; i < 512; i++ {
longStack = append(longStack, fmt.Sprintf("l_%d", i))
}
builder.CreateSampleOrAddValue(longStack, 3)
builder.CreateSampleOrAddValue([]string{"a", "b"}, 42)

assert.Equal(t, 4, len(builder.Profile.Sample))

buf := bytes.NewBuffer(nil)
_, err := builder.Write(buf)
assert.NoError(t, err)
rawProfile := buf.Bytes()

parsed, err := profile.Parse(bytes.NewBuffer(rawProfile))
assert.NoError(t, err)
require.NotNil(t, parsed)
assert.Equal(t, 4, len(parsed.Sample))
assert.Equal(t, 4+512, len(parsed.Function))
assert.Equal(t, 4+512, len(parsed.Location))

stacks := stackCollapse(parsed)

assert.Equal(t, 14*239*period, stacks["a;b;c"])
assert.Equal(t, 4242*period, stacks["a;b;d"])
assert.Equal(t, 42*period, stacks["a;b"])
assert.Equal(t, 3*period, stacks[strings.Join(longStack, ";")])
assert.Equal(t, 4, len(parsed.Sample))
}

func stackCollapse(parsed *profile.Profile) map[string]int64 {
stacks := map[string]int64{}
for _, sample := range parsed.Sample {
stack := ""
Expand All @@ -42,9 +95,7 @@ func TestBackAndForth(t *testing.T) {
}
stack += location.Line[0].Function.Name
}
stacks[stack] = sample.Value[0]
stacks[stack] += sample.Value[0]
}

require.Equal(t, 239*period, stacks["a;b;c"])
require.Equal(t, 4242*period, stacks["a;b;d"])
return stacks
}
2 changes: 1 addition & 1 deletion ebpf/python_ebpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func compareProfiles(t *testing.T, l log.Logger, expected []byte, actual map[str
func collectProfiles(t *testing.T, l log.Logger, profiler Session) map[string]struct{} {
l = log.With(l, "component", "profiles")
profiles := map[string]struct{}{}
err := profiler.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32) {
err := profiler.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, _ SampleAggregation) {
lo.Reverse(stack)
sample := strings.Join(stack, ";")
profiles[sample] = struct{}{}
Expand Down
20 changes: 16 additions & 4 deletions ebpf/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,24 @@ type SessionOptions struct {
SampleRate int
}

type SampleAggregation bool

var (
// SampleAggregated mean samples are accumulated in ebpf, no need to dedup these
SampleAggregated = SampleAggregation(true)
// SampleNotAggregated mean values are not accumulated in ebpf, but streamed to userspace with value=1
// TODO make consider aggregating python in ebpf as well
SampleNotAggregated = SampleAggregation(false)
)

type CollectProfilesCallback func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation SampleAggregation)

type Session interface {
Start() error
Stop()
Update(SessionOptions) error
UpdateTargets(args sd.TargetsOptions)
CollectProfiles(f func(target *sd.Target, stack []string, value uint64, pid uint32)) error
CollectProfiles(f CollectProfilesCallback) error
DebugInfo() interface{}
}

Expand Down Expand Up @@ -225,7 +237,7 @@ func (s *session) UpdateTargets(args sd.TargetsOptions) {
}
}

func (s *session) CollectProfiles(cb func(t *sd.Target, stack []string, value uint64, pid uint32)) error {
func (s *session) CollectProfiles(cb CollectProfilesCallback) error {
s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down Expand Up @@ -257,7 +269,7 @@ func (s *session) DebugInfo() interface{} {
}
}

func (s *session) collectRegularProfile(cb func(t *sd.Target, stack []string, value uint64, pid uint32)) error {
func (s *session) collectRegularProfile(cb CollectProfilesCallback) error {
sb := &stackBuilder{}

keys, values, batch, err := s.getCountsMapValues()
Expand Down Expand Up @@ -315,7 +327,7 @@ func (s *session) collectRegularProfile(cb func(t *sd.Target, stack []string, va
continue // only comm
}
lo.Reverse(sb.stack)
cb(labels, sb.stack, uint64(value), ck.Pid)
cb(labels, sb.stack, uint64(value), ck.Pid, SampleAggregated)
s.collectMetrics(labels, &stats, sb)
}

Expand Down
4 changes: 2 additions & 2 deletions ebpf/session_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/samber/lo"
)

func (s *session) collectPythonProfile(cb func(t *sd.Target, stack []string, value uint64, pid uint32)) error {
func (s *session) collectPythonProfile(cb CollectProfilesCallback) error {
if s.pyperf == nil {
return nil
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *session) collectPythonProfile(cb func(t *sd.Target, stack []string, val
continue // only comm .. todo skip with an option
}
lo.Reverse(sb.stack)
cb(labels, sb.stack, uint64(1), event.Pid)
cb(labels, sb.stack, uint64(1), event.Pid, SampleNotAggregated)
s.collectMetrics(labels, &stats, sb)
}
if stacktraceErrors > 0 {
Expand Down
Loading