diff --git a/pkg/frontend/frontend_select_merge_profile.go b/pkg/frontend/frontend_select_merge_profile.go index e256ed6402..6d716bde7e 100644 --- a/pkg/frontend/frontend_select_merge_profile.go +++ b/pkg/frontend/frontend_select_merge_profile.go @@ -2,7 +2,6 @@ package frontend import ( "context" - "sync" "time" "connectrpc.com/connect" @@ -58,7 +57,6 @@ func (f *Frontend) SelectMergeProfile( // the method is used for pprof export and // truncation is not applicable for that. - var lock sync.Mutex var m pprof.ProfileMerge for intervals.Next() { r := intervals.At() @@ -77,8 +75,6 @@ func (f *Frontend) SelectMergeProfile( if err != nil { return err } - lock.Lock() - defer lock.Unlock() return m.Merge(resp.Msg) }) } diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index dcdfe9cdcf..0349e426b6 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -1214,7 +1214,6 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1 deduplicationNeeded = request.Hints.Block.Deduplication } - var lock sync.Mutex var result pprof.ProfileMerge g, ctx := errgroup.WithContext(ctx) @@ -1234,9 +1233,6 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1 if err != nil { return err } - - lock.Lock() - defer lock.Unlock() return result.Merge(p) })) } @@ -1271,8 +1267,6 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1 if err != nil { return err } - lock.Lock() - defer lock.Unlock() return result.Merge(p) })) } diff --git a/pkg/phlaredb/symdb/resolver.go b/pkg/phlaredb/symdb/resolver.go index 3726068633..aa5438577a 100644 --- a/pkg/phlaredb/symdb/resolver.go +++ b/pkg/phlaredb/symdb/resolver.go @@ -250,15 +250,12 @@ func (r *Resolver) Tree() (*model.Tree, error) { func (r *Resolver) Pprof() (*googlev1.Profile, error) { span, ctx := opentracing.StartSpanFromContext(r.ctx, "Resolver.Pprof") defer span.Finish() - var lock sync.Mutex var p pprof.ProfileMerge err := r.withSymbols(ctx, func(symbols *Symbols, appender *SampleAppender) error { resolved, err := symbols.Pprof(ctx, appender, r.maxNodes, SelectStackTraces(symbols, r.sts)) if err != nil { return err } - lock.Lock() - defer lock.Unlock() return p.Merge(resolved) }) if err != nil { diff --git a/pkg/pprof/merge.go b/pkg/pprof/merge.go index f162f32990..8cb42e9fa1 100644 --- a/pkg/pprof/merge.go +++ b/pkg/pprof/merge.go @@ -4,6 +4,7 @@ import ( "fmt" "hash/maphash" "sort" + "sync" profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" "github.com/grafana/pyroscope/pkg/slices" @@ -23,6 +24,8 @@ import ( // reused and the number of allocs decreased. type ProfileMerge struct { + mu sync.Mutex + profile *profilev1.Profile tmp []uint32 @@ -36,6 +39,9 @@ type ProfileMerge struct { // Merge adds p to the profile merge, cloning new objects. // Profile p is modified in place but not retained by the function. func (m *ProfileMerge) Merge(p *profilev1.Profile) error { + m.mu.Lock() + defer m.mu.Unlock() + if p == nil || len(p.Sample) == 0 || len(p.StringTable) < 2 { return nil } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 510a9aa2cb..ddc21d962d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -894,15 +894,12 @@ func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeP } g, gCtx := errgroup.WithContext(ctx) - var lock sync.Mutex var merge pprof.ProfileMerge g.Go(func() error { ingesterProfile, err := q.selectProfileFromIngesters(gCtx, storeQueries.ingester.MergeProfileRequest(req), plan) if err != nil { return err } - lock.Lock() - defer lock.Unlock() return merge.Merge(ingesterProfile) }) g.Go(func() error { @@ -910,8 +907,6 @@ func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeP if err != nil { return err } - lock.Lock() - defer lock.Unlock() return merge.Merge(storegatewayProfile) }) if err := g.Wait(); err != nil { diff --git a/pkg/querier/select_merge.go b/pkg/querier/select_merge.go index 4460342638..bcf6d1f29d 100644 --- a/pkg/querier/select_merge.go +++ b/pkg/querier/select_merge.go @@ -424,8 +424,6 @@ func selectMergePprofProfile(ctx context.Context, ty *typesv1.ProfileType, respo } span := opentracing.SpanFromContext(ctx) - // Collects the results in parallel. - var lock sync.Mutex var pprofMerge pprof.ProfileMerge g, _ := errgroup.WithContext(ctx) for _, iter := range mergeResults { @@ -446,8 +444,6 @@ func selectMergePprofProfile(ctx context.Context, ty *typesv1.ProfileType, respo if err = pprof.Unmarshal(result, &p); err != nil { return err } - lock.Lock() - defer lock.Unlock() return pprofMerge.Merge(&p) })) }