Skip to content

Commit

Permalink
feat: compactor rewriter lru
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Apr 5, 2024
1 parent 4fdd876 commit 6078947
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions pkg/phlaredb/symdb/rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"math"
"sort"

lru "github.com/hashicorp/golang-lru/v2"

schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/slices"
)

type Rewriter struct {
symdb *SymDB
source SymbolsReader
partitions map[uint64]*partitionRewriter
partitions *lru.Cache[uint64, *partitionRewriter]
}

func NewRewriter(w *SymDB, r SymbolsReader) *Rewriter {
Expand All @@ -38,50 +40,56 @@ func (r *Rewriter) Rewrite(partition uint64, stacktraces []uint32) error {

func (r *Rewriter) init(partition uint64) (p *partitionRewriter, err error) {
if r.partitions == nil {
r.partitions = make(map[uint64]*partitionRewriter)
r.partitions, _ = lru.NewWithEvict(8, func(_ uint64, p *partitionRewriter) {
p.reader.Release()
})
}
if p, err = r.getOrCreatePartition(partition); err != nil {
if p, err = r.getOrCreatePartitionRewriter(partition); err != nil {
return nil, err
}
return p, nil
}

func (r *Rewriter) getOrCreatePartition(partition uint64) (_ *partitionRewriter, err error) {
p, ok := r.partitions[partition]
func (r *Rewriter) getOrCreatePartitionRewriter(partition uint64) (_ *partitionRewriter, err error) {
p, ok := r.partitions.Get(partition)
if ok {
p.reset()
return p, nil
}
pr, err := r.newRewriter(partition)
if err != nil {
return nil, err
}
r.partitions.Add(partition, pr)
return pr, nil
}

n := &partitionRewriter{name: partition}
n.dst = r.symdb.PartitionWriter(partition)
// Note that the partition is not released: we want to keep
// it during the whole lifetime of the rewriter.
pr, err := r.source.Partition(context.TODO(), partition)
func (r *Rewriter) newRewriter(p uint64) (*partitionRewriter, error) {
n := &partitionRewriter{name: p}
reader, err := r.source.Partition(context.TODO(), p)
if err != nil {
return nil, err
}
n.reader = reader
n.dst = r.symdb.PartitionWriter(p)
// We clone locations, functions, and mappings,
// because these object will be modified.
n.src = cloneSymbolsPartially(pr.Symbols())
n.src = cloneSymbolsPartially(reader.Symbols())
var stats PartitionStats
pr.WriteStats(&stats)

reader.WriteStats(&stats)
n.stacktraces = newLookupTable[[]int32](stats.MaxStacktraceID)
n.locations = newLookupTable[*schemav1.InMemoryLocation](stats.LocationsTotal)
n.mappings = newLookupTable[*schemav1.InMemoryMapping](stats.MappingsTotal)
n.functions = newLookupTable[*schemav1.InMemoryFunction](stats.FunctionsTotal)
n.strings = newLookupTable[string](stats.StringsTotal)

r.partitions[partition] = n
return n, nil
}

type partitionRewriter struct {
name uint64

src *Symbols
dst *PartitionWriter
name uint64
src *Symbols
dst *PartitionWriter
reader PartitionReader

stacktraces *lookupTable[[]int32]
locations *lookupTable[*schemav1.InMemoryLocation]
Expand Down

0 comments on commit 6078947

Please sign in to comment.