Skip to content

Commit

Permalink
observer pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
alsoba13 committed Jan 27, 2025
1 parent babebf4 commit ff45b50
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 56 deletions.
98 changes: 46 additions & 52 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ import (
"strings"
"sync"

"github.com/cespare/xxhash/v2"
"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
"golang.org/x/sync/errgroup"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
"github.com/grafana/pyroscope/pkg/experiment/metrics"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/objstore"
"github.com/grafana/pyroscope/pkg/phlaredb/block"
Expand Down Expand Up @@ -54,11 +51,30 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
}
}

func WithSampleObserver(observer SampleObserver) CompactionOption {
return func(p *compactionConfig) {
p.sampleObserver = observer
}
}

type compactionConfig struct {
objectOptions []ObjectOption
tempdir string
source objstore.BucketReader
destination objstore.Bucket
objectOptions []ObjectOption
tempdir string
source objstore.BucketReader
destination objstore.Bucket
sampleObserver SampleObserver
}

type SampleObserver interface {
// Observe is called before the compactor appends the entry
// to the output block. This method must not modify the entry.
Observe(ProfileEntry)

// Flush is called before the compactor flushes the output dataset.
// This call invalidates all references (such as symbols) to the source
// and output blocks. Any error returned by the call terminates the
// compaction job: it's caller responsibility to suppress errors.
Flush() error
}

func Compact(
Expand Down Expand Up @@ -91,19 +107,11 @@ func Compact(

compacted := make([]*metastorev1.BlockMeta, 0, len(plan))
for _, p := range plan {
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir)
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver)
if compactionErr != nil {
return nil, compactionErr
}
compacted = append(compacted, md)

if p.metricsExporter != nil {
go func() {
if sendErr := p.SendRecordedMetrics(); sendErr != nil {
println("ERROR", sendErr) // TODO
}
}()
}
}

return compacted, nil
Expand Down Expand Up @@ -160,13 +168,12 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
}

type CompactionPlan struct {
tenant string
path string
datasetMap map[int32]*datasetCompaction
datasets []*datasetCompaction
meta *metastorev1.BlockMeta
strings *metadata.StringTable
metricsExporter *metrics.Exporter
tenant string
path string
datasetMap map[int32]*datasetCompaction
datasets []*datasetCompaction
meta *metastorev1.BlockMeta
strings *metadata.StringTable
}

func newBlockCompaction(
Expand All @@ -188,27 +195,30 @@ func newBlockCompaction(
Shard: shard,
CompactionLevel: compactionLevel,
}
if compactionLevel == 1 {
p.metricsExporter = metrics.NewExporter(tenant)
}
return p
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
func (b *CompactionPlan) Compact(
ctx context.Context,
dst objstore.Bucket,
tmpdir string,
observer SampleObserver,
) (m *metastorev1.BlockMeta, err error) {
w := NewBlockWriter(dst, b.path, tmpdir)
defer func() {
err = multierror.New(err, w.Close()).Err()
}()
// Datasets are compacted in a strict order.
for _, s := range b.datasets {
s.registerSampleObserver(observer)
if err = s.compact(ctx, w); err != nil {
return nil, fmt.Errorf("compacting block: %w", err)
}
if b.metricsExporter != nil {
b.metricsExporter.AppendMetrics(s.metricsRecorder.Recordings)
}
b.meta.Datasets = append(b.meta.Datasets, s.meta)
}
if err = observer.Flush(); err != nil {
return nil, fmt.Errorf("flushing sample observer: %w", err)
}
if err = w.Flush(ctx); err != nil {
return nil, fmt.Errorf("flushing block writer: %w", err)
}
Expand All @@ -235,10 +245,6 @@ func (b *CompactionPlan) addDataset(md *metastorev1.BlockMeta, s *metastorev1.Da
return sm
}

func (c *CompactionPlan) SendRecordedMetrics() error {
return c.metricsExporter.Send()
}

type datasetCompaction struct {
// Dataset name.
name string
Expand All @@ -259,7 +265,7 @@ type datasetCompaction struct {

flushOnce sync.Once

metricsRecorder *metrics.Recorder
observer SampleObserver
}

func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
Expand Down Expand Up @@ -308,6 +314,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error)
return nil
}

func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) {
m.observer = observer
}

func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
m.path = path
defer func() {
Expand All @@ -333,13 +343,6 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
m.indexRewriter = newIndexRewriter(m.path)
m.symbolsRewriter = newSymbolsRewriter(m.path)

if m.parent.meta.CompactionLevel == 1 {
recordingTime := int64(ulid.MustParse(m.parent.meta.Id).Time())
rules := metrics.RecordingRulesFromTenant(m.parent.tenant)
pyroscopeInstance := pyroscopeInstanceHash(m.parent.meta.Shard, m.parent.meta.CreatedBy)
m.metricsRecorder = metrics.NewRecorder(rules, recordingTime, pyroscopeInstance)
}

g, ctx := errgroup.WithContext(ctx)
for _, s := range m.datasets {
s := s
Expand All @@ -361,13 +364,6 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
return nil
}

func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
buf := make([]byte, 0, 8)
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

func (m *datasetCompaction) mergeAndClose(ctx context.Context) (err error) {
defer func() {
err = multierror.New(err, m.close()).Err()
Expand Down Expand Up @@ -404,9 +400,7 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) {
if err = m.symbolsRewriter.rewriteRow(r); err != nil {
return err
}
if m.metricsRecorder != nil {
m.metricsRecorder.RecordRow(r.Fingerprint, r.Labels, r.Row.TotalValue())
}
m.observer.Observe(r)
return m.profilesWriter.writeRow(r)
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
metrics2 "github.com/grafana/pyroscope/pkg/experiment/metrics"
"github.com/grafana/pyroscope/pkg/objstore"
"github.com/grafana/pyroscope/pkg/util"
)
Expand Down Expand Up @@ -399,7 +400,9 @@ func (w *Worker) runCompaction(job *compactionJob) {
block.WithCompactionObjectOptions(
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),
block.WithObjectDownload(sourcedir),
))
),
block.WithSampleObserver(newSampleObserver(job)),
)
defer func() {
if err = os.RemoveAll(tempdir); err != nil {
level.Warn(logger).Log("msg", "failed to remove compaction directory", "path", tempdir, "err", err)
Expand Down Expand Up @@ -458,6 +461,13 @@ func (w *Worker) runCompaction(job *compactionJob) {
_ = deleteGroup.Wait()
}

func newSampleObserver(job *compactionJob) block.SampleObserver {
if job.CompactionLevel == 0 {
return metrics2.NewMetricsObserver(job.Tenant, job.blocks[0])
}
return &metrics2.NoOpObserver{}
}

func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error {
ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout)
defer cancel()
Expand Down
52 changes: 52 additions & 0 deletions pkg/experiment/metrics/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package metrics

import (
"fmt"

"github.com/cespare/xxhash/v2"
"github.com/oklog/ulid"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
)

type MetricsObserver struct {
tenant string
recorder *Recorder
}

func NewMetricsObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsObserver {
recordingTime := int64(ulid.MustParse(meta.Id).Time())
rules := recordingRulesFromTenant(tenant)
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
return &MetricsObserver{
tenant: tenant,
recorder: NewRecorder(rules, recordingTime, pyroscopeInstance),
}
}

func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
buf := make([]byte, 0, 8)
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

func (o *MetricsObserver) Observe(row block.ProfileEntry) {
o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue())
}

func (o *MetricsObserver) Flush() error {
exporter := NewExporter(o.tenant)
exporter.AppendMetrics(o.recorder.Recordings)
return exporter.Send()
}

type NoOpObserver struct{}

func (o *NoOpObserver) Observe(row block.ProfileEntry) {
}

func (o *NoOpObserver) Flush() error {
return nil
}
3 changes: 1 addition & 2 deletions pkg/experiment/metrics/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func NewRecorder(recordingRules []*RecordingRule, recordingTime int64, pyroscope
for i, rule := range recordingRules {
recordings[i] = &Recording{
rule: *rule,
// fps: make(map[model.Fingerprint]*AggregatedFingerprint),
data: make(map[AggregatedFingerprint]*TimeSeries),
state: &recordingState{
fp: nil,
Expand Down Expand Up @@ -111,7 +110,7 @@ func generateExportedLabels(labelsMap map[string]string, rec *Recording, pyrosco
Value: rec.rule.metricName,
},
labels.Label{
Name: "__pyroscope_instance__",
Name: "pyroscope_instance",
Value: pyroscopeInstance,
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/metrics/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type RecordingRule struct {
keepLabels []string
}

func RecordingRulesFromTenant(tenant string) []*RecordingRule {
func recordingRulesFromTenant(tenant string) []*RecordingRule {
// TODO
return []*RecordingRule{
{
Expand Down

0 comments on commit ff45b50

Please sign in to comment.