Skip to content

Commit

Permalink
memdb: like phlaredb but not using fs (#3506)
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev authored Aug 26, 2024
1 parent d657e80 commit ad91b32
Show file tree
Hide file tree
Showing 26 changed files with 2,933 additions and 499 deletions.
4 changes: 2 additions & 2 deletions ebpf/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/cilium/ebpf v0.11.0
github.com/go-kit/log v0.2.1
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8
github.com/grafana/pyroscope/api v0.4.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465
github.com/klauspost/compress v1.17.9
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
Expand Down
8 changes: 4 additions & 4 deletions ebpf/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo=
Expand All @@ -30,8 +30,8 @@ github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db h1:7aN5cccjIqCLTzed
github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab h1:BA4a7pe6ZTd9F8kXETBoijjFJ/ntaa//1wiH9BZu4zU=
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465 h1:KwWnWVWCNtNq/ewIX7HIKnELmEx2nDP42yskD/pi7QE=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/google/go-cmp v0.6.0
github.com/google/go-github/v58 v58.0.1-0.20240111193443-e9f52699f5e5
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/grafana/alloy/syntax v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ github.com/hudl/fargo v1.4.0/go.mod h1:9Ai6uvFy5fQNq6VPKtg+Ceq1+eTY4nKUlR2JElEOc
github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig=
Expand Down
72 changes: 72 additions & 0 deletions pkg/experiment/ingester/bucket_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package ingester

import (
"context"
objstore2 "github.com/grafana/pyroscope/pkg/objstore"
"github.com/thanos-io/objstore"
"io"
)

type mockBucket struct {
upload func(ctx context.Context, name string, r io.Reader) error
}

func (m mockBucket) Close() error {

//TODO implement me
panic("implement me")
}

func (m mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
//TODO implement me
panic("implement me")
}

func (m mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
//TODO implement me
panic("implement me")
}

func (m mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
//TODO implement me
panic("implement me")
}

func (m mockBucket) Exists(ctx context.Context, name string) (bool, error) {
//TODO implement me
panic("implement me")
}

func (m mockBucket) IsObjNotFoundErr(err error) bool {
//TODO implement me
panic("implement me")
}

func (m mockBucket) IsAccessDeniedErr(err error) bool {
//TODO implement me
panic("implement me")
}

func (m mockBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
//TODO implement me
panic("implement me")
}

func (m mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return m.upload(ctx, name, r)
}

func (m mockBucket) Delete(ctx context.Context, name string) error {
//TODO implement me
panic("implement me")
}

func (m mockBucket) Name() string {
//TODO implement me
panic("implement me")
}

func (m mockBucket) ReaderAt(ctx context.Context, filename string) (objstore2.ReaderAtCloser, error) {
//TODO implement me
panic("implement me")
}
76 changes: 0 additions & 76 deletions pkg/experiment/ingester/loki/index/cmp.go

This file was deleted.

169 changes: 169 additions & 0 deletions pkg/experiment/ingester/memdb/head.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package memdb

import (
"bytes"
"context"
"fmt"
"github.com/google/uuid"
profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
phlarelabels "github.com/grafana/pyroscope/pkg/phlaredb/labels"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"math"
"sync"
)

type FlushedHead struct {
Index []byte
Profiles []byte
Symbols []byte
Meta struct {
ProfileTypeNames []string
MinTimeNanos int64
MaxTimeNanos int64
NumSamples uint64
NumProfiles uint64
NumSeries uint64
}
}

type Head struct {
symbols *symdb.PartitionWriter
metaLock sync.RWMutex
minTimeNanos int64
maxTimeNanos int64
totalSamples *atomic.Uint64
profiles *profilesIndex
metrics *HeadMetrics
}

func NewHead(metrics *HeadMetrics) *Head {
h := &Head{
metrics: metrics,
symbols: symdb.NewPartitionWriter(0, &symdb.Config{
Version: symdb.FormatV3,
Stacktraces: symdb.StacktracesConfig{
MaxNodesPerChunk: 4 << 20,
},
}),
totalSamples: atomic.NewUint64(0),
minTimeNanos: math.MaxInt64,
maxTimeNanos: 0,
profiles: newProfileIndex(metrics),
}

return h
}

func (h *Head) Ingest(p *profilev1.Profile, id uuid.UUID, externalLabels []*typesv1.LabelPair) {
if len(p.Sample) == 0 {
return
}

// delta not supported
externalLabels = phlaremodel.Labels(externalLabels).Delete(phlaremodel.LabelNameDelta)

enforceLabelOrder := phlaremodel.Labels(externalLabels).Get(phlaremodel.LabelNameOrder) == phlaremodel.LabelOrderEnforced
externalLabels = phlaremodel.Labels(externalLabels).Delete(phlaremodel.LabelNameOrder)

lbls, seriesFingerprints := phlarelabels.CreateProfileLabels(enforceLabelOrder, p, externalLabels...)

metricName := phlaremodel.Labels(externalLabels).Get(model.MetricNameLabel)

var profileIngested bool
memProfiles := h.symbols.WriteProfileSymbols(p)
for idxType := range memProfiles {
profile := &memProfiles[idxType]
profile.ID = id
profile.SeriesFingerprint = seriesFingerprints[idxType]
profile.Samples = profile.Samples.Compact(false)

profile.TotalValue = profile.Samples.Sum()

if profile.Samples.Len() == 0 {
continue
}

h.profiles.Add(profile, lbls[idxType], metricName)

profileIngested = true
h.totalSamples.Add(uint64(profile.Samples.Len()))
h.metrics.sampleValuesIngested.WithLabelValues(metricName).Add(float64(profile.Samples.Len()))
h.metrics.sampleValuesReceived.WithLabelValues(metricName).Add(float64(len(p.Sample)))
}

if !profileIngested {
return
}

h.metaLock.Lock()
if p.TimeNanos < h.minTimeNanos {
h.minTimeNanos = p.TimeNanos
}
if p.TimeNanos > h.maxTimeNanos {
h.maxTimeNanos = p.TimeNanos
}
h.metaLock.Unlock()
}

func (h *Head) Flush(ctx context.Context) (res *FlushedHead, err error) {
t := prometheus.NewTimer(h.metrics.flushedBlockDurationSeconds)
defer t.ObserveDuration()

if res, err = h.flush(ctx); err != nil {
h.metrics.flushedBlocks.WithLabelValues("failed").Inc()
return nil, err
}

blockSize := len(res.Index) + len(res.Profiles) + len(res.Symbols)
h.metrics.flushedBlocks.WithLabelValues("success").Inc()
h.metrics.flushedBlockSamples.Observe(float64(res.Meta.NumSamples))
h.metrics.flusehdBlockProfiles.Observe(float64(res.Meta.NumProfiles))
h.metrics.flushedBlockSeries.Observe(float64(res.Meta.NumSeries))
h.metrics.flushedBlockSizeBytes.Observe(float64(blockSize))
h.metrics.flushedFileSizeBytes.WithLabelValues("tsdb").Observe(float64(len(res.Index)))
h.metrics.flushedFileSizeBytes.WithLabelValues("profiles.parquet").Observe(float64(len(res.Profiles)))
h.metrics.flushedFileSizeBytes.WithLabelValues("symbols.symdb").Observe(float64(len(res.Symbols)))
return res, nil
}

func (h *Head) flush(ctx context.Context) (*FlushedHead, error) {
var (
err error
profiles []schemav1.InMemoryProfile
)
res := new(FlushedHead)
res.Meta.MinTimeNanos = h.minTimeNanos
res.Meta.MaxTimeNanos = h.maxTimeNanos
res.Meta.NumSamples = h.totalSamples.Load()
res.Meta.NumSeries = uint64(h.profiles.totalSeries.Load())

if res.Meta.NumSamples == 0 {
return res, nil
}

symbolsBuffer := bytes.NewBuffer(nil)
if err := symdb.WritePartition(h.symbols, symbolsBuffer); err != nil {
return nil, err
}
res.Symbols = symbolsBuffer.Bytes()

if res.Meta.ProfileTypeNames, err = h.profiles.profileTypeNames(); err != nil {
return nil, fmt.Errorf("failed to get profile type names: %w", err)
}

if res.Index, profiles, err = h.profiles.Flush(ctx); err != nil {
return nil, fmt.Errorf("failed to flush profiles: %w", err)
}
res.Meta.NumProfiles = uint64(len(profiles))

if res.Profiles, err = WriteProfiles(h.metrics, profiles); err != nil {
return nil, fmt.Errorf("failed to write profiles parquet: %w", err)
}
return res, nil
}
Loading

0 comments on commit ad91b32

Please sign in to comment.