Skip to content

Commit

Permalink
Revert "feat: cumulative pprof merge for pull mode (#1794)" (#1811)
Browse files Browse the repository at this point in the history
This reverts commit 89265cd.
  • Loading branch information
korniltsev authored Jan 18, 2023
1 parent 58b43a9 commit 086d3b2
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 256 deletions.
2 changes: 1 addition & 1 deletion pkg/adhoc/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newPull(cfg *config.Adhoc, args []string, st *storage.Storage, logger *logr
}

p := parser.New(logger, st, e)
m := scrape.NewManager(logger, p, defaultMetricsRegistry, true)
m := scrape.NewManager(logger, p, defaultMetricsRegistry)
scrapeCfg := &(*scrapeconfig.DefaultConfig())
scrapeCfg.JobName = "adhoc"
scrapeCfg.EnabledProfiles = []string{"cpu", "mem"}
Expand Down
2 changes: 1 addition & 1 deletion pkg/adhoc/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newPush(_ *config.Adhoc, args []string, st *storage.Storage, logger *logrus
p := parser.New(logger, st, e)
return push{
args: args,
handler: server.NewIngestHandler(logger, p, func(*ingestion.IngestInput) {}, httputils.NewDefaultHelper(logger), true),
handler: server.NewIngestHandler(logger, p, func(*ingestion.IngestInput) {}, httputils.NewDefaultHelper(logger)),
logger: logger,
}, nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ func newServerService(c *config.Server) (*serverService, error) {
svc.scrapeManager = scrape.NewManager(
svc.logger.WithField("component", "scrape-manager"),
ingester,
defaultMetricsRegistry,
!svc.config.RemoteWrite.Enabled)
defaultMetricsRegistry)

svc.controller, err = server.New(server.Config{
Configuration: svc.config,
Expand Down
58 changes: 4 additions & 54 deletions pkg/convert/pprof/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"github.com/pyroscope-io/pyroscope/pkg/convert/pprof/streaming"
"github.com/pyroscope-io/pyroscope/pkg/util/cumulativepprof"
"io"
"mime/multipart"
"sync"
Expand All @@ -25,8 +23,6 @@ type RawProfile struct {
// References the next profile in the sequence (cumulative type only).
next *RawProfile

mergers *cumulativepprof.Mergers

m sync.Mutex
// Initializes lazily on Bytes, if not present.
RawData []byte // Represents raw request body as per ingestion API.
Expand Down Expand Up @@ -57,75 +53,29 @@ func (p *RawProfile) ContentType() string {
// two consecutive samples to calculate the diff. If parser is not
// present due to a failure, or sequence violation, the profiles will
// be re-parsed.
func (p *RawProfile) Push(profile []byte, cumulative, mergeCumulative bool) *RawProfile {
func (p *RawProfile) Push(profile []byte, cumulative bool) *RawProfile {
p.m.Lock()
p.Profile = profile
p.RawData = nil
n := &RawProfile{
SampleTypeConfig: p.SampleTypeConfig,
}
if cumulative {
n := &RawProfile{
SampleTypeConfig: p.SampleTypeConfig,
}
// N.B the parser state is only propagated
// after successful Parse call.
n.PreviousProfile = p.Profile
p.next = n
if mergeCumulative {
mergers := p.mergers
if mergers == nil {
mergers = cumulativepprof.NewMergers()
}
err := p.mergeCumulativeLocked(mergers)
if err == nil {
n.mergers = mergers
}
}
}
p.m.Unlock()
return p.next
}

func (p *RawProfile) MergeCumulative(ms *cumulativepprof.Mergers) error {
p.m.Lock()
defer p.m.Unlock()
return p.mergeCumulativeLocked(ms)
}

func (p *RawProfile) mergeCumulativeLocked(ms *cumulativepprof.Mergers) error {
if p.Profile == nil && p.PreviousProfile == nil && p.RawData != nil && p.FormDataContentType != "" {
err := p.loadPprofFromForm()
if err != nil {
return err
}
}
if p.PreviousProfile == nil {
return ErrCumulativeMergeNoPreviousProfile
}
merged, stConfig, err := ms.Merge(p.PreviousProfile, p.Profile, p.SampleTypeConfig)
if err != nil {
return err
}
var mergedProfileBytes bytes.Buffer
err = merged.Write(&mergedProfileBytes)
if err != nil {
return err
}
p.Profile = mergedProfileBytes.Bytes()
p.PreviousProfile = nil
p.SampleTypeConfig = stConfig
p.RawData = nil
return nil
}

const (
formFieldProfile, formFileProfile = "profile", "profile.pprof"
formFieldPreviousProfile, formFilePreviousProfile = "prev_profile", "profile.pprof"
formFieldSampleTypeConfig, formFileSampleTypeConfig = "sample_type_config", "sample_type_config.json"
)

var (
ErrCumulativeMergeNoPreviousProfile = errors.New("no previous profile for cumulative merge")
)

func (p *RawProfile) Bytes() ([]byte, error) {
p.m.Lock()
defer p.m.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions pkg/scrape/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,33 +88,33 @@ func DefaultConfig() *Config {
},
"mutex": {
Path: "/debug/pprof/mutex",
Params: url.Values{
"seconds": []string{"10"},
},
Params: nil,
SampleTypes: map[string]*profile.SampleTypeConfig{
"contentions": {
DisplayName: "mutex_count",
Units: metadata.LockSamplesUnits,
Cumulative: true,
},
"delay": {
DisplayName: "mutex_duration",
Units: metadata.LockNanosecondsUnits,
Cumulative: true,
},
},
},
"block": {
Path: "/debug/pprof/block",
Params: url.Values{
"seconds": []string{"10"},
},
Params: nil,
SampleTypes: map[string]*profile.SampleTypeConfig{
"contentions": {
DisplayName: "block_count",
Units: metadata.LockSamplesUnits,
Cumulative: true,
},
"delay": {
DisplayName: "block_duration",
Units: metadata.LockNanosecondsUnits,
Cumulative: true,
},
},
},
Expand Down
21 changes: 9 additions & 12 deletions pkg/scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,19 @@ type Manager struct {
targetSets map[string][]*targetgroup.Group

reloadC chan struct{}

disableCumulativeMerge bool
}

// NewManager is the Manager constructor
func NewManager(logger logrus.FieldLogger, p ingestion.Ingester, r prometheus.Registerer, disableCumulativeMerge bool) *Manager {
func NewManager(logger logrus.FieldLogger, p ingestion.Ingester, r prometheus.Registerer) *Manager {
c := make(map[string]*config.Config)
return &Manager{
ingester: p,
logger: logger,
scrapeConfigs: c,
scrapePools: make(map[string]*scrapePool),
stop: make(chan struct{}),
reloadC: make(chan struct{}, 1),
metrics: newMetrics(r),
disableCumulativeMerge: disableCumulativeMerge,
ingester: p,
logger: logger,
scrapeConfigs: c,
scrapePools: make(map[string]*scrapePool),
stop: make(chan struct{}),
reloadC: make(chan struct{}, 1),
metrics: newMetrics(r),
}
}

Expand Down Expand Up @@ -93,7 +90,7 @@ func (m *Manager) reload() {
Errorf("reloading target set")
continue
}
sp, err := newScrapePool(scrapeConfig, m.ingester, m.logger, m.metrics, m.disableCumulativeMerge)
sp, err := newScrapePool(scrapeConfig, m.ingester, m.logger, m.metrics)
if err != nil {
m.logger.WithError(err).
WithField("scrape_pool", setName).
Expand Down
12 changes: 2 additions & 10 deletions pkg/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ type scrapePool struct {
// set of hashes.
activeTargets map[uint64]*Target
droppedTargets []*Target

disableCumulativeMerge bool
}

func newScrapePool(cfg *config.Config, p ingestion.Ingester, logger logrus.FieldLogger, m *metrics, disableCumulativeMerge bool) (*scrapePool, error) {
func newScrapePool(cfg *config.Config, p ingestion.Ingester, logger logrus.FieldLogger, m *metrics) (*scrapePool, error) {
m.pools.Inc()
client, err := config.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
if err != nil {
Expand All @@ -90,7 +88,6 @@ func newScrapePool(cfg *config.Config, p ingestion.Ingester, logger logrus.Field

metrics: m,
poolMetrics: m.poolMetrics(cfg.JobName),
disableCumulativeMerge: disableCumulativeMerge,
}

return &sp, nil
Expand All @@ -108,8 +105,6 @@ func (sp *scrapePool) newScrapeLoop(s *scraper, i, t time.Duration) *scrapeLoop
delta: d,
interval: i,
timeout: t,

disableCumulativeMerge: sp.disableCumulativeMerge,
}
x.ctx, x.cancel = context.WithCancel(sp.ctx)
return &x
Expand Down Expand Up @@ -329,8 +324,6 @@ type scrapeLoop struct {
delta time.Duration
interval time.Duration
timeout time.Duration

disableCumulativeMerge bool
}

func (sl *scrapeLoop) run() {
Expand Down Expand Up @@ -428,8 +421,7 @@ func (sl *scrapeLoop) scrape(startTime, endTime time.Time) error {
}

profile := sl.scraper.profile
sl.scraper.profile = profile.Push(buf.Bytes(), sl.scraper.cumulative, !sl.disableCumulativeMerge)

sl.scraper.profile = profile.Push(buf.Bytes(), sl.scraper.cumulative)
return sl.scraper.ingester.Ingest(ctx, &ingestion.IngestInput{
Profile: profile,
Metadata: ingestion.Metadata{
Expand Down
23 changes: 7 additions & 16 deletions pkg/server/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"time"

"github.com/pyroscope-io/pyroscope/pkg/util/cumulativepprof"

"github.com/pyroscope-io/pyroscope/pkg/convert/speedscope"
"github.com/sirupsen/logrus"

Expand All @@ -30,25 +28,22 @@ type ingestHandler struct {
ingester ingestion.Ingester
onSuccess func(*ingestion.IngestInput)
httpUtils httputils.Utils

disableCumulativeMerge bool
}

func (ctrl *Controller) ingestHandler() http.Handler {
return NewIngestHandler(ctrl.log, ctrl.ingestser, func(pi *ingestion.IngestInput) {
ctrl.StatsInc("ingest")
ctrl.StatsInc("ingest:" + pi.Metadata.SpyName)
ctrl.appStats.Add(hashString(pi.Metadata.Key.AppName()))
}, ctrl.httpUtils, !ctrl.config.RemoteWrite.Enabled)
}, ctrl.httpUtils)
}

func NewIngestHandler(log *logrus.Logger, p ingestion.Ingester, onSuccess func(*ingestion.IngestInput), httpUtils httputils.Utils, disableCumulativeMerge bool) http.Handler {
func NewIngestHandler(log *logrus.Logger, p ingestion.Ingester, onSuccess func(*ingestion.IngestInput), httpUtils httputils.Utils) http.Handler {
return ingestHandler{
log: log,
ingester: p,
onSuccess: onSuccess,
httpUtils: httpUtils,
disableCumulativeMerge: disableCumulativeMerge,
log: log,
ingester: p,
onSuccess: onSuccess,
httpUtils: httpUtils,
}
}

Expand Down Expand Up @@ -164,16 +159,12 @@ func (h ingestHandler) ingestInputFromRequest(r *http.Request) (*ingestion.Inges
}

case strings.Contains(contentType, "multipart/form-data"):
p := &pprof.RawProfile{
input.Profile = &pprof.RawProfile{
FormDataContentType: contentType,
RawData: b,
StreamingParser: true,
PoolStreamingParser: true,
}
if !h.disableCumulativeMerge {
p.MergeCumulative(cumulativepprof.NewMergers())
}
input.Profile = p
}

if input.Profile == nil {
Expand Down
Loading

0 comments on commit 086d3b2

Please sign in to comment.