Skip to content

Commit

Permalink
fix: discard invalid samples at read (#3193)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Apr 12, 2024
1 parent cc98cb4 commit ff97953
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
10 changes: 7 additions & 3 deletions pkg/phlaredb/schemas/v1/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,14 @@ func NewSamplesFromMap(m map[uint32]int64) Samples {
}
var i int
for k, v := range m {
s.StacktraceIDs[i] = k
s.Values[i] = uint64(v)
i++
if k != 0 && v > 0 {
s.StacktraceIDs[i] = k
s.Values[i] = uint64(v)
i++
}
}
s.StacktraceIDs = s.StacktraceIDs[:i]
s.Values = s.Values[:i]
sort.Sort(s)
return s
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/phlaredb/schemas/v1/profiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/google/uuid"
"github.com/parquet-go/parquet-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

phlareparquet "github.com/grafana/pyroscope/pkg/parquet"
Expand Down Expand Up @@ -440,3 +441,15 @@ func generateSamples(n int) []*Sample {
}
return samples
}

func Test_SamplesFromMap(t *testing.T) {
m := map[uint32]int64{
1: 2,
0: 0,
2: 3,
3: -1,
}
samples := NewSamplesFromMap(m)
assert.Equal(t, len(m), cap(samples.Values))
assert.Equal(t, 2, len(samples.Values))
}
11 changes: 8 additions & 3 deletions pkg/phlaredb/symdb/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,17 @@ func (r *Resolver) AddSamples(partition uint64, s schemav1.Samples) {
func (r *Resolver) AddSamplesFromParquetRow(partition uint64, stacktraceIDs, values []parquet.Value) {
r.WithPartitionSamples(partition, func(samples map[uint32]int64) {
for i, sid := range stacktraceIDs {
samples[sid.Uint32()] += values[i].Int64()
if s := sid.Uint32(); s > 0 {
samples[s] += values[i].Int64()
}
}
})
}

func (r *Resolver) AddSamplesWithSpanSelector(partition uint64, s schemav1.Samples, spanSelector model.SpanSelector) {
r.WithPartitionSamples(partition, func(samples map[uint32]int64) {
for i, sid := range s.StacktraceIDs {
if _, ok := spanSelector[s.Spans[i]]; ok {
if _, ok := spanSelector[s.Spans[i]]; ok && sid > 0 {
samples[sid] += int64(s.Values[i])
}
}
Expand Down Expand Up @@ -247,7 +249,10 @@ func (r *Resolver) Pprof() (*googlev1.Profile, error) {
defer lock.Unlock()
return p.Merge(resolved)
})
return p.Profile(), err
if err != nil {
return nil, err
}
return p.Profile(), nil
}

func (r *Resolver) withSymbols(ctx context.Context, fn func(*Symbols, schemav1.Samples) error) error {
Expand Down

0 comments on commit ff97953

Please sign in to comment.