Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(distributor): Relabel profiles at ingest #3369

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 194 additions & 33 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"sync"
"time"
"unsafe"

"connectrpc.com/connect"
"github.com/dustin/go-humanize"
Expand All @@ -31,14 +32,15 @@ import (
"github.com/prometheus/common/model"
"go.uber.org/atomic"

googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
"github.com/grafana/pyroscope/pkg/clientpool"
"github.com/grafana/pyroscope/pkg/distributor/aggregator"
distributormodel "github.com/grafana/pyroscope/pkg/distributor/model"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/model/relabel"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/slices"
"github.com/grafana/pyroscope/pkg/tenant"
Expand Down Expand Up @@ -123,6 +125,7 @@ type Limits interface {
MaxProfileSymbolValueLength(tenantID string) int
MaxSessionsPerSeries(tenantID string) int
EnforceLabelsOrder(tenantID string) bool
IngestionRelabelingRules(tenantID string) []*relabel.Config
validation.ProfileValidationLimits
aggregator.Limits
}
Expand Down Expand Up @@ -399,8 +402,11 @@ func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.Pu
series.Labels = d.limitMaxSessionsPerSeries(maxSessionsPerSeries, series.Labels)
}

// Next we split profiles by labels.
profileSeries := extractSampleSeries(req)
// Next we split profiles by labels and apply relabel rules.
profileSeries, bytesRelabelDropped, profilesRelabelDropped := extractSampleSeries(req, d.limits.IngestionRelabelingRules(tenantID))
validation.DiscardedBytes.WithLabelValues(string(validation.RelabelRules), tenantID).Add(bytesRelabelDropped)
validation.DiscardedProfiles.WithLabelValues(string(validation.RelabelRules), tenantID).Add(profilesRelabelDropped)

// Filter our series and profiles without samples.
for _, series := range profileSeries {
series.Samples = slices.RemoveInPlace(series.Samples, func(sample *distributormodel.ProfileSample, _ int) bool {
Expand Down Expand Up @@ -491,8 +497,20 @@ func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.Pu
}
}

// sampleSize returns the size of a samples in bytes.
func sampleSize(stringTable []string, samplesSlice []*profilev1.Sample) int64 {
var size int64
for _, s := range samplesSlice {
size += int64(s.SizeVT())
for _, l := range s.Label {
size += int64(len(stringTable[l.Key]) + len(stringTable[l.Str]) + len(stringTable[l.NumUnit]))
}
}
return size
}

// profileSizeBytes returns the size of symbols and samples in bytes.
func profileSizeBytes(p *googlev1.Profile) (symbols, samples int64) {
func profileSizeBytes(p *profilev1.Profile) (symbols, samples int64) {
fullSize := p.SizeVT()
// remove samples
samplesSlice := p.Sample
Expand All @@ -516,7 +534,7 @@ func profileSizeBytes(p *googlev1.Profile) (symbols, samples int64) {
return
}

func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels, profile *googlev1.Profile) (func() (*pprof.ProfileMerge, error), bool, error) {
func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels, profile *profilev1.Profile) (func() (*pprof.ProfileMerge, error), bool, error) {
a, ok := d.aggregator.AggregatorForTenant(tenantID)
if !ok {
return nil, false, nil
Expand All @@ -534,7 +552,7 @@ func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels,
return r.Handler(), true, nil
}

func mergeProfile(profile *googlev1.Profile) aggregator.AggregateFn[*pprof.ProfileMerge] {
func mergeProfile(profile *profilev1.Profile) aggregator.AggregateFn[*pprof.ProfileMerge] {
return func(m *pprof.ProfileMerge) (*pprof.ProfileMerge, error) {
if m == nil {
m = new(pprof.ProfileMerge)
Expand Down Expand Up @@ -633,7 +651,106 @@ func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

func extractSampleSeries(req *distributormodel.PushRequest) []*distributormodel.ProfileSeries {
type sampleKey struct {
stacktrace string
// note this is an index into the string table, rather than span ID
spanIDIdx int64
}

func sampleKeyFromSample(stringTable []string, s *profilev1.Sample) sampleKey {
var k sampleKey

// populate spanID if present
for _, l := range s.Label {
if stringTable[int(l.Key)] == pprof.SpanIDLabelName {
k.spanIDIdx = l.Str
}
}
if len(s.LocationId) > 0 {
k.stacktrace = unsafe.String(
(*byte)(unsafe.Pointer(&s.LocationId[0])),
len(s.LocationId)*8,
)
}
return k
}

type lazyGroup struct {
sampleGroup pprof.SampleGroup
// The map is only initialized when the group is being modified. Key is the
// string representation (unsafe) of the sample stack trace and its potential
// span ID.
sampleMap map[sampleKey]*profilev1.Sample
labels phlaremodel.Labels
}

func (g *lazyGroup) addSampleGroup(stringTable []string, sg pprof.SampleGroup) {
if len(g.sampleGroup.Samples) == 0 {
g.sampleGroup = sg
return
}

// If the group is already initialized, we need to merge the samples.
if g.sampleMap == nil {
g.sampleMap = make(map[sampleKey]*profilev1.Sample)
for _, s := range g.sampleGroup.Samples {
g.sampleMap[sampleKeyFromSample(stringTable, s)] = s
}
}

for _, s := range sg.Samples {
k := sampleKeyFromSample(stringTable, s)
if _, ok := g.sampleMap[k]; !ok {
g.sampleGroup.Samples = append(g.sampleGroup.Samples, s)
g.sampleMap[k] = s
} else {
// merge the samples
for idx := range s.Value {
g.sampleMap[k].Value[idx] += s.Value[idx]
}
}
}
}

type groupsWithFingerprints struct {
m map[uint64][]lazyGroup
order []uint64
}

func newGroupsWithFingerprints() *groupsWithFingerprints {
return &groupsWithFingerprints{
m: make(map[uint64][]lazyGroup),
}
}

func (g *groupsWithFingerprints) add(stringTable []string, lbls phlaremodel.Labels, group pprof.SampleGroup) {
fp := lbls.Hash()
idxs, ok := g.m[fp]
if ok {
// fingerprint matches, check if the labels are the same
for _, idx := range idxs {
if phlaremodel.CompareLabelPairs(idx.labels, lbls) == 0 {
// append samples to the group
idx.addSampleGroup(stringTable, group)
return
}
}
} else {
g.order = append(g.order, fp)
}

// add the labels to the list
g.m[fp] = append(g.m[fp], lazyGroup{
sampleGroup: group,
labels: lbls,
})
}

func extractSampleSeries(req *distributormodel.PushRequest, relabelRules []*relabel.Config) (result []*distributormodel.ProfileSeries, bytesRelabelDropped, profilesRelabelDropped float64) {
var (
lblbuilder = phlaremodel.NewLabelsBuilder(phlaremodel.EmptyLabels())
kolesnikovae marked this conversation as resolved.
Show resolved Hide resolved
)

profileSeries := make([]*distributormodel.ProfileSeries, 0, len(req.Series))
for _, series := range req.Series {
s := &distributormodel.ProfileSeries{
Expand All @@ -643,33 +760,74 @@ func extractSampleSeries(req *distributormodel.PushRequest) []*distributormodel.
for _, raw := range series.Samples {
pprof.RenameLabel(raw.Profile.Profile, pprof.ProfileIDLabelName, pprof.SpanIDLabelName)
groups := pprof.GroupSamplesWithoutLabels(raw.Profile.Profile, pprof.SpanIDLabelName)

if len(groups) == 0 || (len(groups) == 1 && len(groups[0].Labels) == 0) {
// No sample labels in the profile.

// relabel the labels of the series
lblbuilder.Reset(series.Labels)
if len(relabelRules) > 0 {
keep := relabel.ProcessBuilder(lblbuilder, relabelRules...)
if !keep {
bytesRelabelDropped += float64(raw.Profile.SizeVT())
profilesRelabelDropped++ // in this case we dropped a whole profile
continue
}
}

// Copy over the labels from the builder
s.Labels = lblbuilder.Labels()

// We do not modify the request.
s.Samples = append(s.Samples, raw)

continue
}
e := pprof.NewSampleExporter(raw.Profile.Profile)

// iterate through groups relabel them and find relevant overlapping labelsets
groupsKept := newGroupsWithFingerprints()
for _, group := range groups {
// exportSamples creates a new profile with the samples provided.
// The samples are obtained via GroupSamples call, which means
// the underlying capacity is referenced by the source profile.
// Therefore, the slice has to be copied and samples zeroed to
// avoid ownership issues.
profile := exportSamples(e, group.Samples)
// Note that group.Labels reference strings from the source profile.
labels := mergeSeriesAndSampleLabels(raw.Profile.Profile, series.Labels, group.Labels)
profileSeries = append(profileSeries, &distributormodel.ProfileSeries{
Labels: labels,
Samples: []*distributormodel.ProfileSample{{Profile: profile}},
})
lblbuilder.Reset(series.Labels)
addSampleLabelsToLabelsBuilder(lblbuilder, raw.Profile.Profile, group.Labels)
if len(relabelRules) > 0 {
keep := relabel.ProcessBuilder(lblbuilder, relabelRules...)
if !keep {
bytesRelabelDropped += float64(sampleSize(raw.Profile.Profile.StringTable, group.Samples))
continue
}
}

// add the group to the list
groupsKept.add(raw.Profile.StringTable, lblbuilder.Labels(), group)
}

if len(groupsKept.m) == 0 {
// no groups kept, count the whole profile as dropped
profilesRelabelDropped++
continue
}

e := pprof.NewSampleExporter(raw.Profile.Profile)
for _, idx := range groupsKept.order {
for _, group := range groupsKept.m[idx] {
// exportSamples creates a new profile with the samples provided.
// The samples are obtained via GroupSamples call, which means
// the underlying capacity is referenced by the source profile.
// Therefore, the slice has to be copied and samples zeroed to
// avoid ownership issues.
profile := exportSamples(e, group.sampleGroup.Samples)
profileSeries = append(profileSeries, &distributormodel.ProfileSeries{
Labels: group.labels,
Samples: []*distributormodel.ProfileSample{{Profile: profile}},
})
}
}
}
if len(s.Samples) > 0 {
profileSeries = append(profileSeries, s)
}
}
return profileSeries
return profileSeries, bytesRelabelDropped, profilesRelabelDropped
}

func (d *Distributor) limitMaxSessionsPerSeries(maxSessionsPerSeries int, labels phlaremodel.Labels) phlaremodel.Labels {
Expand Down Expand Up @@ -712,22 +870,25 @@ func (d *Distributor) rateLimit(tenantID string, req *distributormodel.PushReque
return nil
}

// mergeSeriesAndSampleLabels merges sample labels with
// series labels. Series labels take precedence.
func mergeSeriesAndSampleLabels(p *googlev1.Profile, sl []*typesv1.LabelPair, pl []*googlev1.Label) []*typesv1.LabelPair {
m := phlaremodel.Labels(sl).Clone()
// addSampleLabelsToLabelsBuilder: adds sample label that don't exists yet on the profile builder. So the existing labels take precedence.
func addSampleLabelsToLabelsBuilder(b *phlaremodel.LabelsBuilder, p *profilev1.Profile, pl []*profilev1.Label) {
var name string
for _, l := range pl {
m = append(m, &typesv1.LabelPair{
Name: p.StringTable[l.Key],
Value: p.StringTable[l.Str],
})
name = p.StringTable[l.Key]
if l.Str <= 0 {
// skip if label value is not a string
continue
}
if b.Get(name) != "" {
// do nothing if label name already exists
continue
}
b.Set(name, p.StringTable[l.Str])
}
sort.Stable(m)
return m.Unique()
}

func exportSamples(e *pprof.SampleExporter, samples []*googlev1.Sample) *pprof.Profile {
samplesCopy := make([]*googlev1.Sample, len(samples))
func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof.Profile {
samplesCopy := make([]*profilev1.Sample, len(samples))
copy(samplesCopy, samples)
slices.Clear(samples)
n := pprof.NewProfile()
Expand Down
Loading
Loading