Skip to content

Commit

Permalink
feat: compactor rewriter LRU cache (#3165)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Apr 8, 2024
1 parent 94564fa commit f63de9b
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 227 deletions.
5 changes: 1 addition & 4 deletions cmd/profilecli/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ func compact(ctx context.Context, src, dst string, metas []*block.Meta, shards i
for _, b := range blocks {
b := b
g.Go(func() error {
if err := b.Open(groupCtx); err != nil {
return err
}
return b.Symbols().Load(groupCtx)
return b.Open(groupCtx)
})
}
if err := g.Wait(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/grafana/river v0.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/json-iterator/go v1.1.12
github.com/k0kubun/pp/v3 v3.2.0
github.com/klauspost/compress v1.17.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/hashicorp/nomad/api v0.0.0-20230721134942-515895c7690c h1:Nc3Mt2BAnq0/VoLEntF/nipX+K1S7pG+RgwiitSv6v0=
Expand Down
6 changes: 0 additions & 6 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,6 @@ func (c *BlockCompactor) CompactWithSplitting(ctx context.Context, dest string,
if err := b.Open(ctx); err != nil {
return errors.Wrapf(err, "open block %s", meta.ULID)
}
// Only load symbols if we are splitting.
if shardCount > 1 {
if err = b.Symbols().Load(ctx); err != nil {
return errors.Wrapf(err, "error loading symbols")
}
}
readers[idx] = b
return nil
})
Expand Down
13 changes: 0 additions & 13 deletions pkg/phlaredb/block_querier_symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/runutil"
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"golang.org/x/sync/errgroup"

Expand All @@ -28,7 +27,6 @@ import (
type symbolsResolver interface {
symdb.SymbolsReader
io.Closer
Load(context.Context) error
}

type symbolsResolverV1 struct {
Expand All @@ -54,11 +52,6 @@ func newSymbolsResolverV1(ctx context.Context, bucketReader phlareobj.Bucket, me
return r, err
}

func (r *symbolsResolverV1) Load(_ context.Context) error {
// Unsupported.
return nil
}

func (r *symbolsResolverV1) Close() error {
return multierror.New(
r.stacktraces.Close(),
Expand Down Expand Up @@ -141,12 +134,6 @@ func newSymbolsResolverV2(ctx context.Context, b phlareobj.Bucket, meta *block.M
return &r, err
}

func (r *symbolsResolverV2) Load(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "symbols.Load")
defer sp.Finish()
return r.symbols.Load(ctx)
}

func (r *symbolsResolverV2) Close() error {
err := multierror.New()
if r.symbols != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/phlaredb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,6 @@ func newBlock(t testing.TB, generator func() []*testhelper.ProfileBuilder) *sing
}
blk := NewSingleBlockQuerierFromMeta(ctx, bkt, meta)
require.NoError(t, blk.Open(ctx))
require.NoError(t, blk.symbols.Load(ctx))
return blk
}

Expand Down Expand Up @@ -903,7 +902,6 @@ func Benchmark_CompactSplit(b *testing.B) {
require.NoError(b, err)
bl := NewSingleBlockQuerierFromMeta(ctx, bkt, meta)
require.NoError(b, bl.Open(ctx))
require.NoError(b, bl.Symbols().Load(ctx))
dst := b.TempDir()

b.ResetTimer()
Expand Down
14 changes: 3 additions & 11 deletions pkg/phlaredb/symdb/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ type Reader struct {
partitions []*partition
partitionsMap map[uint64]*partition

// Indicates whether the block reader was loaded.
// Loaded partitions are not released.
loaded bool

locations parquetobj.File
mappings parquetobj.File
functions parquetobj.File
Expand Down Expand Up @@ -196,10 +192,8 @@ func (r *Reader) partition(ctx context.Context, partition uint64) (*partition, e
if !ok {
return nil, ErrPartitionNotFound
}
if !r.loaded {
if err := p.init(ctx); err != nil {
return nil, err
}
if err := p.init(ctx); err != nil {
return nil, err
}
return p, nil
}
Expand All @@ -219,9 +213,7 @@ func (p *partition) init(ctx context.Context) (err error) {
}

func (p *partition) Release() {
if !p.reader.loaded {
p.tx().release()
}
p.tx().release()
}

func (p *partition) tx() *fetchTx {
Expand Down
118 changes: 0 additions & 118 deletions pkg/phlaredb/symdb/block_reader_load.go

This file was deleted.

36 changes: 0 additions & 36 deletions pkg/phlaredb/symdb/block_reader_load_test.go

This file was deleted.

5 changes: 0 additions & 5 deletions pkg/phlaredb/symdb/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ func (m *mockSymbolsReader) Partition(ctx context.Context, partition uint64) (Pa
return r, args.Error(1)
}

func (m *mockSymbolsReader) Load(ctx context.Context) error {
args := m.Called(ctx)
return args.Error(0)
}

type fakeContext struct {
context.Context
once sync.Once
Expand Down
Loading

0 comments on commit f63de9b

Please sign in to comment.