Skip to content

Commit

Permalink
fix: make pprof merge thread-safe (#3564)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Sep 18, 2024
1 parent e56265f commit 1083bfd
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 22 deletions.
4 changes: 0 additions & 4 deletions pkg/frontend/frontend_select_merge_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package frontend

import (
"context"
"sync"
"time"

"connectrpc.com/connect"
Expand Down Expand Up @@ -58,7 +57,6 @@ func (f *Frontend) SelectMergeProfile(
// the method is used for pprof export and
// truncation is not applicable for that.

var lock sync.Mutex
var m pprof.ProfileMerge
for intervals.Next() {
r := intervals.At()
Expand All @@ -77,8 +75,6 @@ func (f *Frontend) SelectMergeProfile(
if err != nil {
return err
}
lock.Lock()
defer lock.Unlock()
return m.Merge(resp.Msg)
})
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,6 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
deduplicationNeeded = request.Hints.Block.Deduplication
}

var lock sync.Mutex
var result pprof.ProfileMerge
g, ctx := errgroup.WithContext(ctx)

Expand All @@ -1234,9 +1233,6 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
if err != nil {
return err
}

lock.Lock()
defer lock.Unlock()
return result.Merge(p)
}))
}
Expand Down Expand Up @@ -1271,8 +1267,6 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
if err != nil {
return err
}
lock.Lock()
defer lock.Unlock()
return result.Merge(p)
}))
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/phlaredb/symdb/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,12 @@ func (r *Resolver) Tree() (*model.Tree, error) {
func (r *Resolver) Pprof() (*googlev1.Profile, error) {
span, ctx := opentracing.StartSpanFromContext(r.ctx, "Resolver.Pprof")
defer span.Finish()
var lock sync.Mutex
var p pprof.ProfileMerge
err := r.withSymbols(ctx, func(symbols *Symbols, appender *SampleAppender) error {
resolved, err := symbols.Pprof(ctx, appender, r.maxNodes, SelectStackTraces(symbols, r.sts))
if err != nil {
return err
}
lock.Lock()
defer lock.Unlock()
return p.Merge(resolved)
})
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/pprof/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"hash/maphash"
"sort"
"sync"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
"github.com/grafana/pyroscope/pkg/slices"
Expand All @@ -23,6 +24,8 @@ import (
// reused and the number of allocs decreased.

type ProfileMerge struct {
mu sync.Mutex

profile *profilev1.Profile
tmp []uint32

Expand All @@ -36,6 +39,9 @@ type ProfileMerge struct {
// Merge adds p to the profile merge, cloning new objects.
// Profile p is modified in place but not retained by the function.
func (m *ProfileMerge) Merge(p *profilev1.Profile) error {
m.mu.Lock()
defer m.mu.Unlock()

if p == nil || len(p.Sample) == 0 || len(p.StringTable) < 2 {
return nil
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,24 +894,19 @@ func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeP
}

g, gCtx := errgroup.WithContext(ctx)
var lock sync.Mutex
var merge pprof.ProfileMerge
g.Go(func() error {
ingesterProfile, err := q.selectProfileFromIngesters(gCtx, storeQueries.ingester.MergeProfileRequest(req), plan)
if err != nil {
return err
}
lock.Lock()
defer lock.Unlock()
return merge.Merge(ingesterProfile)
})
g.Go(func() error {
storegatewayProfile, err := q.selectProfileFromStoreGateway(gCtx, storeQueries.storeGateway.MergeProfileRequest(req), plan)
if err != nil {
return err
}
lock.Lock()
defer lock.Unlock()
return merge.Merge(storegatewayProfile)
})
if err := g.Wait(); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/querier/select_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,6 @@ func selectMergePprofProfile(ctx context.Context, ty *typesv1.ProfileType, respo
}

span := opentracing.SpanFromContext(ctx)
// Collects the results in parallel.
var lock sync.Mutex
var pprofMerge pprof.ProfileMerge
g, _ := errgroup.WithContext(ctx)
for _, iter := range mergeResults {
Expand All @@ -446,8 +444,6 @@ func selectMergePprofProfile(ctx context.Context, ty *typesv1.ProfileType, respo
if err = pprof.Unmarshal(result, &p); err != nil {
return err
}
lock.Lock()
defer lock.Unlock()
return pprofMerge.Merge(&p)
}))
}
Expand Down

0 comments on commit 1083bfd

Please sign in to comment.