Skip to content

Commit

Permalink
feat: compactor rewriter lru
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Apr 5, 2024
1 parent 4fdd876 commit 6159263
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 223 deletions.
5 changes: 1 addition & 4 deletions cmd/profilecli/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ func compact(ctx context.Context, src, dst string, metas []*block.Meta, shards i
for _, b := range blocks {
b := b
g.Go(func() error {
if err := b.Open(groupCtx); err != nil {
return err
}
return b.Symbols().Load(groupCtx)
return b.Open(groupCtx)
})
}
if err := g.Wait(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/grafana/river v0.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/json-iterator/go v1.1.12
github.com/k0kubun/pp/v3 v3.2.0
github.com/klauspost/compress v1.17.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/hashicorp/nomad/api v0.0.0-20230721134942-515895c7690c h1:Nc3Mt2BAnq0/VoLEntF/nipX+K1S7pG+RgwiitSv6v0=
Expand Down
6 changes: 0 additions & 6 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,6 @@ func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string,
if err := b.Open(ctx); err != nil {
return errors.Wrapf(err, "open block %s", meta.ULID)
}
// Only load symbols if we are splitting.
if shardCount > 1 {
if err = b.Symbols().Load(ctx); err != nil {
return errors.Wrapf(err, "error loading symbols")
}
}
readers[idx] = b
return nil
})
Expand Down
13 changes: 0 additions & 13 deletions pkg/phlaredb/block_querier_symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/runutil"
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"golang.org/x/sync/errgroup"

Expand All @@ -28,7 +27,6 @@ import (
type symbolsResolver interface {
symdb.SymbolsReader
io.Closer
Load(context.Context) error
}

type symbolsResolverV1 struct {
Expand All @@ -54,11 +52,6 @@ func newSymbolsResolverV1(ctx context.Context, bucketReader phlareobj.Bucket, me
return r, err
}

func (r *symbolsResolverV1) Load(_ context.Context) error {
// Unsupported.
return nil
}

func (r *symbolsResolverV1) Close() error {
return multierror.New(
r.stacktraces.Close(),
Expand Down Expand Up @@ -141,12 +134,6 @@ func newSymbolsResolverV2(ctx context.Context, b phlareobj.Bucket, meta *block.M
return &r, err
}

func (r *symbolsResolverV2) Load(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "symbols.Load")
defer sp.Finish()
return r.symbols.Load(ctx)
}

func (r *symbolsResolverV2) Close() error {
err := multierror.New()
if r.symbols != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/phlaredb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,6 @@ func newBlock(t testing.TB, generator func() []*testhelper.ProfileBuilder) *sing
}
blk := NewSingleBlockQuerierFromMeta(ctx, bkt, meta)
require.NoError(t, blk.Open(ctx))
require.NoError(t, blk.symbols.Load(ctx))
return blk
}

Expand Down Expand Up @@ -903,7 +902,6 @@ func Benchmark_CompactSplit(b *testing.B) {
require.NoError(b, err)
bl := NewSingleBlockQuerierFromMeta(ctx, bkt, meta)
require.NoError(b, bl.Open(ctx))
require.NoError(b, bl.Symbols().Load(ctx))
dst := b.TempDir()

b.ResetTimer()
Expand Down
14 changes: 3 additions & 11 deletions pkg/phlaredb/symdb/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ type Reader struct {
partitions []*partition
partitionsMap map[uint64]*partition

// Indicates whether the block reader was loaded.
// Loaded partitions are not released.
loaded bool

locations parquetobj.File
mappings parquetobj.File
functions parquetobj.File
Expand Down Expand Up @@ -196,10 +192,8 @@ func (r *Reader) partition(ctx context.Context, partition uint64) (*partition, e
if !ok {
return nil, ErrPartitionNotFound
}
if !r.loaded {
if err := p.init(ctx); err != nil {
return nil, err
}
if err := p.init(ctx); err != nil {
return nil, err
}
return p, nil
}
Expand All @@ -219,9 +213,7 @@ func (p *partition) init(ctx context.Context) (err error) {
}

func (p *partition) Release() {
if !p.reader.loaded {
p.tx().release()
}
p.tx().release()
}

func (p *partition) tx() *fetchTx {
Expand Down
118 changes: 0 additions & 118 deletions pkg/phlaredb/symdb/block_reader_load.go

This file was deleted.

36 changes: 0 additions & 36 deletions pkg/phlaredb/symdb/block_reader_load_test.go

This file was deleted.

5 changes: 0 additions & 5 deletions pkg/phlaredb/symdb/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ func (m *mockSymbolsReader) Partition(ctx context.Context, partition uint64) (Pa
return r, args.Error(1)
}

func (m *mockSymbolsReader) Load(ctx context.Context) error {
args := m.Called(ctx)
return args.Error(0)
}

type fakeContext struct {
context.Context
once sync.Once
Expand Down
49 changes: 27 additions & 22 deletions pkg/phlaredb/symdb/rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"math"
"sort"

lru "github.com/hashicorp/golang-lru/v2"

schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/slices"
)

type Rewriter struct {
symdb *SymDB
source SymbolsReader
partitions map[uint64]*partitionRewriter
partitions *lru.Cache[uint64, *partitionRewriter]
}

func NewRewriter(w *SymDB, r SymbolsReader) *Rewriter {
Expand All @@ -38,50 +40,53 @@ func (r *Rewriter) Rewrite(partition uint64, stacktraces []uint32) error {

func (r *Rewriter) init(partition uint64) (p *partitionRewriter, err error) {
if r.partitions == nil {
r.partitions = make(map[uint64]*partitionRewriter)
}
if p, err = r.getOrCreatePartition(partition); err != nil {
return nil, err
r.partitions, _ = lru.NewWithEvict(2, func(_ uint64, p *partitionRewriter) {
p.reader.Release()
})
}
return p, nil
return r.getOrCreatePartitionRewriter(partition)
}

func (r *Rewriter) getOrCreatePartition(partition uint64) (_ *partitionRewriter, err error) {
p, ok := r.partitions[partition]
func (r *Rewriter) getOrCreatePartitionRewriter(partition uint64) (_ *partitionRewriter, err error) {
p, ok := r.partitions.Get(partition)
if ok {
p.reset()
return p, nil
}
pr, err := r.newRewriter(partition)
if err != nil {
return nil, err
}
r.partitions.Add(partition, pr)
return pr, nil
}

n := &partitionRewriter{name: partition}
n.dst = r.symdb.PartitionWriter(partition)
// Note that the partition is not released: we want to keep
// it during the whole lifetime of the rewriter.
pr, err := r.source.Partition(context.TODO(), partition)
func (r *Rewriter) newRewriter(p uint64) (*partitionRewriter, error) {
n := &partitionRewriter{name: p}
reader, err := r.source.Partition(context.TODO(), p)
if err != nil {
return nil, err
}
n.reader = reader
n.dst = r.symdb.PartitionWriter(p)
// We clone locations, functions, and mappings,
// because these object will be modified.
n.src = cloneSymbolsPartially(pr.Symbols())
n.src = cloneSymbolsPartially(reader.Symbols())
var stats PartitionStats
pr.WriteStats(&stats)

reader.WriteStats(&stats)
n.stacktraces = newLookupTable[[]int32](stats.MaxStacktraceID)
n.locations = newLookupTable[*schemav1.InMemoryLocation](stats.LocationsTotal)
n.mappings = newLookupTable[*schemav1.InMemoryMapping](stats.MappingsTotal)
n.functions = newLookupTable[*schemav1.InMemoryFunction](stats.FunctionsTotal)
n.strings = newLookupTable[string](stats.StringsTotal)

r.partitions[partition] = n
return n, nil
}

type partitionRewriter struct {
name uint64

src *Symbols
dst *PartitionWriter
name uint64
src *Symbols
dst *PartitionWriter
reader PartitionReader

stacktraces *lookupTable[[]int32]
locations *lookupTable[*schemav1.InMemoryLocation]
Expand Down
Loading

0 comments on commit 6159263

Please sign in to comment.