Skip to content

Commit

Permalink
feat: symdb custom binary format (#3138)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Jun 26, 2024
1 parent 0d7e66a commit 359ecf1
Show file tree
Hide file tree
Showing 41 changed files with 3,301 additions and 868 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ the block there are multiple files:

* `profiles.parquet` [parquet] table that contains profiles.

* `symbols` sub-directory contains profiling symbols that provide a link between
the compiled or interpreted binary code and the original source code:
- A `index.symdb` file with meta information, which helps to find symbols for a specific profile.
- A `stacktraces.symdb` file contains stack traces compacted in the [parent pointer tree].
- Parquet tables for models referenced by stack traces:
`locations.parquet`, `functions.parquet`, `mappings.parquet`, `strings.parquet`.
* `symbols.symdb` that contains symbolic information for the profiles stored in the block.

## Data model

Expand Down
2 changes: 1 addition & 1 deletion pkg/phlaredb/block/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (stats MetaStats) ConvertToBlockStats() *ingestv1.BlockStats {
indexBytes = f.SizeBytes
} else if f.RelPath == "profiles.parquet" {
profileBytes += f.SizeBytes
} else if strings.HasPrefix(f.RelPath, "symbols") {
} else if strings.HasPrefix(f.RelPath, "symbols") || filepath.Ext(f.RelPath) == ".symdb" {
symbolBytes += f.SizeBytes
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/phlaredb/block_querier_symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ func (p *symbolsPartition) Release() {
}

type inMemoryParquetTables struct {
strings inMemoryparquetReader[string, *schemav1.StringPersister]
functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister]
locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister]
mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister]
strings inMemoryparquetReader[string, schemav1.StringPersister]
functions inMemoryparquetReader[schemav1.InMemoryFunction, schemav1.FunctionPersister]
locations inMemoryparquetReader[schemav1.InMemoryLocation, schemav1.LocationPersister]
mappings inMemoryparquetReader[schemav1.InMemoryMapping, schemav1.MappingPersister]
}

func openInMemoryParquetTables(ctx context.Context, r phlareobj.BucketReader, meta *block.Meta) (*inMemoryParquetTables, error) {
Expand Down
28 changes: 25 additions & 3 deletions pkg/phlaredb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func CompactWithSplitting(ctx context.Context, opts CompactWithSplittingOpts) (
srcMetas[i] = b.Meta()
}

symbolsCompactor := newSymbolsCompactor(opts.Dst)
symbolsCompactor := newSymbolsCompactor(opts.Dst, symdb.FormatV2)
defer runutil.CloseWithLogOnErr(util.Logger, symbolsCompactor, "close symbols compactor")

outMeta := compactMetas(srcMetas...)
Expand Down Expand Up @@ -725,6 +725,7 @@ func (it *dedupeProfileRowIterator) Next() bool {
}

type symbolsCompactor struct {
version symdb.FormatVersion
rewriters map[BlockReader]*symdb.Rewriter
w *symdb.SymDB
stacktraces []uint32
Expand All @@ -733,10 +734,22 @@ type symbolsCompactor struct {
flushed bool
}

func newSymbolsCompactor(path string) *symbolsCompactor {
func newSymbolsCompactor(path string, version symdb.FormatVersion) *symbolsCompactor {
if version == symdb.FormatV3 {
return &symbolsCompactor{
version: version,
w: symdb.NewSymDB(symdb.DefaultConfig().
WithVersion(symdb.FormatV3).
WithDirectory(path)),
dst: path,
rewriters: make(map[BlockReader]*symdb.Rewriter),
}
}
dst := filepath.Join(path, symdb.DefaultDirName)
return &symbolsCompactor{
version: symdb.FormatV2,
w: symdb.NewSymDB(symdb.DefaultConfig().
WithVersion(symdb.FormatV2).
WithDirectory(dst).
WithParquetConfig(symdb.ParquetConfig{
MaxBufferRowCount: defaultParquetConfig.MaxBufferRowCount,
Expand Down Expand Up @@ -772,7 +785,13 @@ func (s *symbolsRewriter) Close() (uint64, error) {
if err := s.symbolsCompactor.Flush(); err != nil {
return 0, err
}
return s.numSamples, util.CopyDir(s.symbolsCompactor.dst, filepath.Join(s.dst, symdb.DefaultDirName))
if s.version == symdb.FormatV3 {
dst := filepath.Join(s.dst, symdb.DefaultFileName)
src := filepath.Join(s.symbolsCompactor.dst, symdb.DefaultFileName)
return s.numSamples, util.CopyFile(src, dst)
} else {
return s.numSamples, util.CopyDir(s.symbolsCompactor.dst, filepath.Join(s.dst, symdb.DefaultDirName))
}
}

func (s *symbolsCompactor) ReWriteRow(profile profileRow) (uint64, error) {
Expand Down Expand Up @@ -814,6 +833,9 @@ func (s *symbolsCompactor) Flush() error {
}

func (s *symbolsCompactor) Close() error {
if s.version == symdb.FormatV3 {
return os.RemoveAll(filepath.Join(s.dst, symdb.DefaultFileName))
}
return os.RemoveAll(s.dst)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/phlaredb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ func blockQuerierFromMeta(t *testing.T, dir string, m block.Meta) *singleBlockQu
require.NoError(t, err)
blk := NewSingleBlockQuerierFromMeta(ctx, bkt, &m)
require.NoError(t, blk.Open(ctx))
// require.NoError(t, blk.symbols.Load(ctx))
return blk
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,19 @@ func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Hea
}
}

h.symdb = symdb.NewSymDB(symdb.DefaultConfig().
WithDirectory(filepath.Join(h.headPath, symdb.DefaultDirName)).
WithParquetConfig(symdb.ParquetConfig{
symdbConfig := symdb.DefaultConfig()
if cfg.SymDBFormat == symdb.FormatV3 {
symdbConfig.Version = symdb.FormatV3
symdbConfig.Dir = h.headPath
} else {
symdbConfig.Version = symdb.FormatV2
symdbConfig.Dir = filepath.Join(h.headPath, symdb.DefaultDirName)
symdbConfig.Parquet = symdb.ParquetConfig{
MaxBufferRowCount: h.parquetConfig.MaxBufferRowCount,
}))
}
}

h.symdb = symdb.NewSymDB(symdbConfig)

h.wg.Add(1)
go h.loop()
Expand Down Expand Up @@ -566,7 +574,9 @@ func (h *Head) flush(ctx context.Context) error {
}
for _, file := range h.symdb.Files() {
// Files' path is relative to the symdb dir.
file.RelPath = filepath.Join(symdb.DefaultDirName, file.RelPath)
if h.symdb.FormatVersion() == symdb.FormatV2 {
file.RelPath = filepath.Join(symdb.DefaultDirName, file.RelPath)
}
files = append(files, file)
blockSize += file.SizeBytes
h.metrics.flushedFileSizeBytes.WithLabelValues(file.RelPath).Observe(float64(file.SizeBytes))
Expand Down
6 changes: 5 additions & 1 deletion pkg/phlaredb/phlaredb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
phlareobj "github.com/grafana/pyroscope/pkg/objstore"
phlarecontext "github.com/grafana/pyroscope/pkg/phlare/context"
"github.com/grafana/pyroscope/pkg/phlaredb/block"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
"github.com/grafana/pyroscope/pkg/util"
)

Expand All @@ -49,7 +50,10 @@ type Config struct {
// TODO: docs
RowGroupTargetSize uint64 `yaml:"row_group_target_size"`

Parquet *ParquetConfig `yaml:"-"` // Those configs should not be exposed to the user, rather they should be determined by pyroscope itself. Currently, they are solely used for test cases.
// Those configs should not be exposed to the user, rather they should be determined by pyroscope itself.
// Currently, they are solely used for test cases.
Parquet *ParquetConfig `yaml:"-"`
SymDBFormat symdb.FormatVersion `yaml:"-"`

MinFreeDisk uint64 `yaml:"min_free_disk_gb"`
MinDiskAvailablePercentage float64 `yaml:"min_disk_available_percentage"`
Expand Down
15 changes: 7 additions & 8 deletions pkg/phlaredb/schemas/v1/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ var functionsSchema = parquet.SchemaOf(new(profilev1.Function))

type FunctionPersister struct{}

func (*FunctionPersister) Name() string { return "functions" }
func (FunctionPersister) Name() string { return "functions" }

func (*FunctionPersister) Schema() *parquet.Schema { return functionsSchema }
func (FunctionPersister) Schema() *parquet.Schema { return functionsSchema }

func (*FunctionPersister) Deconstruct(row parquet.Row, fn *InMemoryFunction) parquet.Row {
func (FunctionPersister) Deconstruct(row parquet.Row, fn InMemoryFunction) parquet.Row {
if cap(row) < 5 {
row = make(parquet.Row, 0, 5)
}
Expand All @@ -27,15 +27,15 @@ func (*FunctionPersister) Deconstruct(row parquet.Row, fn *InMemoryFunction) par
return row
}

func (*FunctionPersister) Reconstruct(row parquet.Row) (*InMemoryFunction, error) {
func (FunctionPersister) Reconstruct(row parquet.Row) (InMemoryFunction, error) {
loc := InMemoryFunction{
Id: row[0].Uint64(),
Name: row[1].Uint32(),
SystemName: row[2].Uint32(),
Filename: row[3].Uint32(),
StartLine: row[4].Uint32(),
}
return &loc, nil
return loc, nil
}

type InMemoryFunction struct {
Expand All @@ -52,7 +52,6 @@ type InMemoryFunction struct {
StartLine uint32
}

func (f *InMemoryFunction) Clone() *InMemoryFunction {
n := *f
return &n
func (f InMemoryFunction) Clone() InMemoryFunction {
return f
}
16 changes: 8 additions & 8 deletions pkg/phlaredb/schemas/v1/locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ var locationsSchema = parquet.SchemaOf(new(profilev1.Location))

type LocationPersister struct{}

func (*LocationPersister) Name() string { return "locations" }
func (LocationPersister) Name() string { return "locations" }

func (*LocationPersister) Schema() *parquet.Schema { return locationsSchema }
func (LocationPersister) Schema() *parquet.Schema { return locationsSchema }

func (*LocationPersister) Deconstruct(row parquet.Row, loc *InMemoryLocation) parquet.Row {
func (LocationPersister) Deconstruct(row parquet.Row, loc InMemoryLocation) parquet.Row {
var (
col = -1
newCol = func() int {
Expand Down Expand Up @@ -59,7 +59,7 @@ func (*LocationPersister) Deconstruct(row parquet.Row, loc *InMemoryLocation) pa
return row
}

func (*LocationPersister) Reconstruct(row parquet.Row) (*InMemoryLocation, error) {
func (LocationPersister) Reconstruct(row parquet.Row) (InMemoryLocation, error) {
loc := InMemoryLocation{
Id: row[0].Uint64(),
MappingId: uint32(row[1].Uint64()),
Expand All @@ -74,7 +74,7 @@ func (*LocationPersister) Reconstruct(row parquet.Row) (*InMemoryLocation, error
for i, v := range lines[len(lines)/2:] {
loc.Line[i].Line = int32(v.Uint64())
}
return &loc, nil
return loc, nil
}

type InMemoryLocation struct {
Expand Down Expand Up @@ -108,11 +108,11 @@ type InMemoryLocation struct {
Line []InMemoryLine
}

func (l *InMemoryLocation) Clone() *InMemoryLocation {
x := *l
func (l InMemoryLocation) Clone() InMemoryLocation {
x := l
x.Line = make([]InMemoryLine, len(l.Line))
copy(x.Line, l.Line)
return &x
return x
}

type InMemoryLine struct {
Expand Down
15 changes: 7 additions & 8 deletions pkg/phlaredb/schemas/v1/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ var mappingsSchema = parquet.SchemaOf(new(profilev1.Mapping))

type MappingPersister struct{}

func (*MappingPersister) Name() string { return "mappings" }
func (MappingPersister) Name() string { return "mappings" }

func (*MappingPersister) Schema() *parquet.Schema { return mappingsSchema }
func (MappingPersister) Schema() *parquet.Schema { return mappingsSchema }

func (*MappingPersister) Deconstruct(row parquet.Row, m *InMemoryMapping) parquet.Row {
func (MappingPersister) Deconstruct(row parquet.Row, m InMemoryMapping) parquet.Row {
if cap(row) < 10 {
row = make(parquet.Row, 0, 10)
}
Expand All @@ -32,7 +32,7 @@ func (*MappingPersister) Deconstruct(row parquet.Row, m *InMemoryMapping) parque
return row
}

func (*MappingPersister) Reconstruct(row parquet.Row) (*InMemoryMapping, error) {
func (MappingPersister) Reconstruct(row parquet.Row) (InMemoryMapping, error) {
mapping := InMemoryMapping{
Id: row[0].Uint64(),
MemoryStart: row[1].Uint64(),
Expand All @@ -45,7 +45,7 @@ func (*MappingPersister) Reconstruct(row parquet.Row) (*InMemoryMapping, error)
HasLineNumbers: row[8].Boolean(),
HasInlineFrames: row[9].Boolean(),
}
return &mapping, nil
return mapping, nil
}

type InMemoryMapping struct {
Expand All @@ -72,7 +72,6 @@ type InMemoryMapping struct {
HasInlineFrames bool
}

func (m *InMemoryMapping) Clone() *InMemoryMapping {
n := *m
return &n
func (m InMemoryMapping) Clone() InMemoryMapping {
return m
}
8 changes: 4 additions & 4 deletions pkg/phlaredb/schemas/v1/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package v1
import googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"

type Models interface {
*Profile | *InMemoryProfile |
*googlev1.Location | *InMemoryLocation |
*googlev1.Function | *InMemoryFunction |
*googlev1.Mapping | *InMemoryMapping |
*Profile | InMemoryProfile |
*googlev1.Location | InMemoryLocation |
*googlev1.Function | InMemoryFunction |
*googlev1.Mapping | InMemoryMapping |
*Stacktrace |
string
}
Loading

0 comments on commit 359ecf1

Please sign in to comment.