Skip to content

Commit

Permalink
perf: optimize tree processing (#3386)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Jun 26, 2024
1 parent c0fb2b0 commit b62930e
Show file tree
Hide file tree
Showing 22 changed files with 1,039 additions and 340 deletions.
27 changes: 16 additions & 11 deletions pkg/model/stacktraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package model

import (
"bytes"
"container/heap"
"io"
"sync"
"unsafe"

ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
"github.com/grafana/pyroscope/pkg/og/util/varint"
"github.com/grafana/pyroscope/pkg/util/minheap"
)

// TODO(kolesnikovae): Remove support for StacktracesMergeFormat_MERGE_FORMAT_STACKTRACES.
Expand Down Expand Up @@ -213,26 +213,24 @@ func (t *StacktraceTree) LookupLocations(dst []uint64, idx int32) []uint64 {

// MinValue returns the minimum "total" value a node in a tree has to have.
func (t *StacktraceTree) MinValue(maxNodes int64) int64 {
// TODO(kolesnikovae): Consider quickselect.
if maxNodes < 1 || maxNodes >= int64(len(t.Nodes)) {
return 0
}
s := make(minHeap, 0, maxNodes)
h := &s
h := make([]int64, 0, maxNodes)
for _, n := range t.Nodes {
if h.Len() >= int(maxNodes) {
if n.Total > (*h)[0] {
heap.Pop(h)
if len(h) >= int(maxNodes) {
if n.Total > h[0] {
h = minheap.Pop(h)
} else {
continue
}
}
heap.Push(h, n.Total)
h = minheap.Push(h, n.Total)
}
if h.Len() < int(maxNodes) {
if len(h) < int(maxNodes) {
return 0
}
return (*h)[0]
return h[0]
}

type StacktraceTreeTraverseFn = func(index int32, children []int32) error
Expand Down Expand Up @@ -337,6 +335,7 @@ func (t *StacktraceTree) Tree(maxNodes int64, names []string) *Tree {
var name string
if sn.Location < 0 {
name = truncatedNodeName
sn.Total = sn.Value
} else {
name = names[sn.Location]
}
Expand All @@ -350,5 +349,11 @@ func (t *StacktraceTree) Tree(maxNodes int64, names []string) *Tree {
return nil
})

return &Tree{root: root.children[0].children}
// Roots should not have parents.
s := root.children[0].children
for _, n := range s {
n.parent.parent = nil
}

return &Tree{root: s}
}
65 changes: 19 additions & 46 deletions pkg/model/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package model

import (
"bytes"
"container/heap"
"fmt"
"io"
"sort"
Expand All @@ -14,6 +13,7 @@ import (

"github.com/grafana/pyroscope/pkg/og/util/varint"
"github.com/grafana/pyroscope/pkg/slices"
"github.com/grafana/pyroscope/pkg/util/minheap"
)

type Tree struct {
Expand Down Expand Up @@ -262,30 +262,29 @@ func (t *Tree) minValue(maxNodes int64) int64 {
return 0
}

s := make(minHeap, 0, maxNodes)
h := &s
h := make([]int64, 0, maxNodes)

nodes = append(nodes[:0], t.root...)
var n *node
for len(nodes) > 0 {
last := len(nodes) - 1
n, nodes = nodes[last], nodes[:last]
if h.Len() >= int(maxNodes) {
if n.total > (*h)[0] {
heap.Pop(h)
if len(h) >= int(maxNodes) {
if n.total > h[0] {
h = minheap.Pop(h)
} else {
continue
}
}
heap.Push(h, n.total)
h = minheap.Push(h, n.total)
nodes = append(nodes, n.children...)
}

if h.Len() < int(maxNodes) {
if len(h) < int(maxNodes) {
return 0
}

return (*h)[0]
return h[0]
}

// size reports number of nodes the tree consists of.
Expand All @@ -303,36 +302,6 @@ func (t *Tree) size(buf []*node) int64 {
return s
}

// minHeap is a custom min-heap data structure that stores integers.
type minHeap []int64

// Len returns the number of elements in the min-heap.
func (h minHeap) Len() int { return len(h) }

// Less returns true if the element at index i is less than the element at index j.
// This method is used by the container/heap package to maintain the min-heap property.
func (h minHeap) Less(i, j int) bool { return h[i] < h[j] }

// Swap exchanges the elements at index i and index j.
// This method is used by the container/heap package to reorganize the min-heap during its operations.
func (h minHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

// Push adds an element (x) to the min-heap.
// This method is used by the container/heap package to grow the min-heap.
func (h *minHeap) Push(x interface{}) {
*h = append(*h, x.(int64))
}

// Pop removes and returns the smallest element (minimum) from the min-heap.
// This method is used by the container/heap package to shrink the min-heap.
func (h *minHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

const truncatedNodeName = "other"

var truncatedNodeNameBytes = []byte(truncatedNodeName)
Expand Down Expand Up @@ -473,6 +442,16 @@ func NewTreeMerger() *TreeMerger {
return new(TreeMerger)
}

func (m *TreeMerger) MergeTree(t *Tree) {
m.mu.Lock()
defer m.mu.Unlock()
if m.t != nil {
m.t.Merge(t)
} else {
m.t = t
}
}

func (m *TreeMerger) MergeTreeBytes(b []byte) error {
// TODO(kolesnikovae): Ideally, we should not have
// the intermediate tree t but update m.t reading
Expand All @@ -481,13 +460,7 @@ func (m *TreeMerger) MergeTreeBytes(b []byte) error {
if err != nil {
return err
}
m.mu.Lock()
if m.t != nil {
m.t.Merge(t)
} else {
m.t = t
}
m.mu.Unlock()
m.MergeTree(t)
return nil
}

Expand Down
22 changes: 7 additions & 15 deletions pkg/phlaredb/sample_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,13 @@ func mergeBySpans[T interface{ StacktracePartition() uint64 }](ctx context.Conte
defer runutil.CloseWithErrCapture(&err, profiles, "failed to close profile stream")
for profiles.Next() {
p := profiles.At()
partition := p.Row.StacktracePartition()
stacktraces := p.Values[0]
values := p.Values[1]
spans := p.Values[2]
r.WithPartitionSamples(partition, func(samples map[uint32]int64) {
for i, sid := range stacktraces {
spanID := spans[i].Uint64()
if spanID == 0 {
continue
}
if _, ok := spanSelector[spanID]; ok {
samples[sid.Uint32()] += values[i].Int64()
}
}
})
r.AddSamplesWithSpanSelectorFromParquetRow(
p.Row.StacktracePartition(),
p.Values[0],
p.Values[1],
p.Values[2],
spanSelector,
)
}
return profiles.Err()
}
Expand Down
57 changes: 18 additions & 39 deletions pkg/phlaredb/schemas/v1/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,12 @@ type Samples struct {

func NewSamples(size int) Samples {
return Samples{
StacktraceIDs: make([]uint32, 0, size),
Values: make([]uint64, 0, size),
StacktraceIDs: make([]uint32, size),
Values: make([]uint64, size),
}
}

func NewSamplesFromMap(m map[uint32]int64) Samples {
func NewSamplesFromMap(m map[uint32]uint64) Samples {
s := Samples{
StacktraceIDs: make([]uint32, len(m)),
Values: make([]uint64, len(m)),
Expand All @@ -339,7 +339,7 @@ func NewSamplesFromMap(m map[uint32]int64) Samples {
for k, v := range m {
if k != 0 && v > 0 {
s.StacktraceIDs[i] = k
s.Values[i] = uint64(v)
s.Values[i] = v
i++
}
}
Expand All @@ -364,6 +364,20 @@ func (s Samples) Clone() Samples {
return cloneSamples(s)
}

func (s Samples) Range(n, m int) Samples {
if n < 0 || n > m || m > s.Len() {
return Samples{}
}
x := Samples{
StacktraceIDs: s.StacktraceIDs[n:m],
Values: s.Values[n:m],
}
if len(s.Spans) > 0 {
x.Spans = s.Spans[n:m]
}
return x
}

func trimDuplicateSamples(samples Samples) Samples {
sort.Sort(samples)
n := 0
Expand Down Expand Up @@ -454,41 +468,6 @@ func (s Samples) Sum() uint64 {
return sum
}

// TODO(kolesnikovae): Consider map alternatives.

// SampleMap is a map of partitioned samples structured
// as follows: partition => stacktrace_id => value
type SampleMap map[uint64]map[uint32]int64

func (m SampleMap) Partition(p uint64) map[uint32]int64 {
s, ok := m[p]
if !ok {
s = make(map[uint32]int64, 128)
m[p] = s
}
return s
}

func (m SampleMap) AddSamples(partition uint64, samples Samples) {
p := m.Partition(partition)
for i, sid := range samples.StacktraceIDs {
p[sid] += int64(samples.Values[i])
}
}

func (m SampleMap) WriteSamples(partition uint64, dst *Samples) {
p, ok := m[partition]
if !ok {
return
}
dst.StacktraceIDs = dst.StacktraceIDs[:0]
dst.Values = dst.Values[:0]
for k, v := range p {
dst.StacktraceIDs = append(dst.StacktraceIDs, k)
dst.Values = append(dst.Values, uint64(v))
}
}

const profileSize = uint64(unsafe.Sizeof(InMemoryProfile{}))

func (p InMemoryProfile) Size() uint64 {
Expand Down
Loading

0 comments on commit b62930e

Please sign in to comment.