diff --git a/go.mod b/go.mod index 09ac464f90..fc01cccdba 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/dgrijalva/jwt-go v3.2.0+incompatible - github.com/dgrijalva/lfu-go v0.0.0-20141010002404-f174e76c5138 + github.com/dgrijalva/lfu-go v0.0.0-00010101000000-000000000000 github.com/fatih/color v1.10.0 github.com/felixge/fgprof v0.9.1 github.com/go-ole/go-ole v1.2.5 // indirect @@ -58,4 +58,4 @@ require ( replace github.com/mgechev/revive v1.0.3 => github.com/pyroscope-io/revive v1.0.6-0.20210330033039-4a71146f9dc1 -replace github.com/dgrijalva/lfu-go => github.com/pyroscope-io/lfu-go v1.0.3 +replace github.com/dgrijalva/lfu-go => github.com/pyroscope-io/lfu-go v1.0.4-0.20210705073127-18ae647b0d6e diff --git a/go.sum b/go.sum index 54b8c8333c..e605db3844 100644 --- a/go.sum +++ b/go.sum @@ -379,8 +379,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/pyroscope-io/dotnetdiag v1.2.1 h1:3XEMrfFJnZ87BiEhozyQKmCUAuMd/Spq7KChPuD2Cf0= github.com/pyroscope-io/dotnetdiag v1.2.1/go.mod h1:eFUEHCp4eD1TgcXMlJihC+R4MrqGf7nTRdWxNADbDHA= -github.com/pyroscope-io/lfu-go v1.0.3 h1:jWwHf5MtfHioSYga2LrBEjmXluUloNQrKa3XXLFqjMA= -github.com/pyroscope-io/lfu-go v1.0.3/go.mod h1:3W9sGrDLhKFkHZPXkz6c5dAKrxcwkKbFFKnJtDukMDA= +github.com/pyroscope-io/lfu-go v1.0.4-0.20210705073127-18ae647b0d6e h1:r17BNAAXLsBRJWeRyROGSxont3ob/GmF4Qz2e68+Rfc= +github.com/pyroscope-io/lfu-go v1.0.4-0.20210705073127-18ae647b0d6e/go.mod h1:3W9sGrDLhKFkHZPXkz6c5dAKrxcwkKbFFKnJtDukMDA= github.com/pyroscope-io/revive v1.0.6-0.20210330033039-4a71146f9dc1 h1:0v9lBNgdmVtpyyk9PP/DfpJlOHkXriu5YgNlrhQw5YE= github.com/pyroscope-io/revive v1.0.6-0.20210330033039-4a71146f9dc1/go.mod h1:tSw34BaGZ0iF+oVKDOjq1/LuxGifgW7shaJ6+dBYFXg= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= diff --git a/pkg/agent/profiler/profiler.go b/pkg/agent/profiler/profiler.go index 924bae2d3d..759b8af3d6 100644 --- a/pkg/agent/profiler/profiler.go +++ b/pkg/agent/profiler/profiler.go @@ -24,6 +24,7 @@ var ( type Config struct { ApplicationName string // e.g backend.purchases + Tags map[string]string ServerAddress string // e.g http://pyroscope.services.internal:4040 AuthToken string // specify this token when using pyroscope cloud SampleRate uint32 @@ -62,6 +63,7 @@ func Start(cfg Config) (*Profiler, error) { sc := agent.SessionConfig{ Upstream: upstream, AppName: cfg.ApplicationName, + Tags: cfg.Tags, ProfilingTypes: types.DefaultProfileTypes, DisableGCRuns: cfg.DisableGCRuns, SpyName: types.GoSpy, @@ -70,14 +72,15 @@ func Start(cfg Config) (*Profiler, error) { Pid: 0, WithSubprocesses: false, } - session := agent.NewSession(&sc, cfg.Logger) - if err := session.Start(); err != nil { - return nil, fmt.Errorf("start session: %v", err) + session, err := agent.NewSession(&sc, cfg.Logger) + if err != nil { + return nil, fmt.Errorf("new session: %w", err) + } + if err = session.Start(); err != nil { + return nil, fmt.Errorf("start session: %w", err) } - return &Profiler{ - session: session, - }, nil + return &Profiler{session: session}, nil } // Stop stops continious profiling session diff --git a/pkg/agent/session.go b/pkg/agent/session.go index 7d3e185dc7..2ed7e89a3f 100644 --- a/pkg/agent/session.go +++ b/pkg/agent/session.go @@ -16,6 +16,8 @@ import ( _ "github.com/pyroscope-io/pyroscope/pkg/agent/rbspy" "github.com/pyroscope-io/pyroscope/pkg/agent/types" "github.com/pyroscope-io/pyroscope/pkg/agent/upstream" + "github.com/pyroscope-io/pyroscope/pkg/flameql" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/util/slices" // revive:enable:blank-imports @@ -47,11 +49,16 @@ type ProfileSession struct { stopTime time.Time logger Logger + + // The map holds fully qualified names (storage keys) for + // every profiler that is running within the session. + names map[spy.ProfileType]string } type SessionConfig struct { Upstream upstream.Upstream AppName string + Tags map[string]string ProfilingTypes []spy.ProfileType DisableGCRuns bool SpyName string @@ -61,10 +68,11 @@ type SessionConfig struct { WithSubprocesses bool } -func NewSession(c *SessionConfig, logger Logger) *ProfileSession { +func NewSession(c *SessionConfig, logger Logger) (*ProfileSession, error) { ps := &ProfileSession{ upstream: c.Upstream, appName: c.AppName, + names: make(map[spy.ProfileType]string), spyName: c.SpyName, profileTypes: c.ProfilingTypes, disableGCRuns: c.DisableGCRuns, @@ -84,7 +92,64 @@ func NewSession(c *SessionConfig, logger Logger) *ProfileSession { ps.tries = make([]*transporttrie.Trie, 1) } - return ps + if err := ps.createNames(c.Tags); err != nil { + return nil, err + } + + return ps, nil +} + +func (ps *ProfileSession) createNames(tags map[string]string) error { + for _, t := range ps.profileTypes { + tagsCopy := make(map[string]string) + for k, v := range tags { + tagsCopy[k] = v + } + appName, err := mergeTagsWithAppName(ps.appName, tagsCopy) + if err != nil { + return err + } + tagsCopy["__name__"] = appName + "." + string(t) + ps.names[t] = segment.NewKey(tagsCopy).Normalized() + } + return nil +} + +// mergeTagsWithAppName validates user input and merges explicitly specified +// tags with tags from app name. +// +// App name may be in the full form including tags (app.name{foo=bar,baz=qux}). +// Returned application name is always short, any tags that were included are +// moved to tags map. When merged with explicitly provided tags (config/CLI), +// last take precedence. +// +// App name may be an empty string. Tags must not contain reserved keys, +// the map is modified in place. +func mergeTagsWithAppName(appName string, tags map[string]string) (string, error) { + k, err := segment.ParseKey(appName) + if err != nil { + return "", err + } + appName = k.AppName() + if tags == nil { + return appName, nil + } + // Note that at this point k may contain + // reserved tag keys (e.g. '__name__'). + for tagKey, v := range k.Labels() { + if flameql.IsTagKeyReserved(tagKey) { + continue + } + if _, ok := tags[tagKey]; !ok { + tags[tagKey] = v + } + } + for tagKey := range tags { + if err = flameql.ValidateTagKey(tagKey); err != nil { + return "", err + } + } + return appName, nil } func (ps *ProfileSession) takeSnapshots() { @@ -223,9 +288,8 @@ func (ps *ProfileSession) uploadTries(now time.Time) { } if !skipUpload { - name := ps.appName + "." + string(ps.profileTypes[i]) ps.upstream.Upload(&upstream.UploadJob{ - Name: name, + Name: ps.names[ps.profileTypes[i]], StartTime: ps.startTime, EndTime: endTime, SpyName: ps.spyName, diff --git a/pkg/agent/session_test.go b/pkg/agent/session_test.go index ba9895b255..cf354104ef 100644 --- a/pkg/agent/session_test.go +++ b/pkg/agent/session_test.go @@ -8,61 +8,84 @@ import ( . "github.com/onsi/gomega" "github.com/pyroscope-io/pyroscope/pkg/agent/spy" "github.com/pyroscope-io/pyroscope/pkg/agent/upstream" - "github.com/pyroscope-io/pyroscope/pkg/config" - "github.com/pyroscope-io/pyroscope/pkg/structs/transporttrie" - "github.com/pyroscope-io/pyroscope/pkg/testing" "github.com/sirupsen/logrus" ) -const durThreshold = 30 * time.Millisecond - type upstreamMock struct { - tries []*transporttrie.Trie + uploads []*upstream.UploadJob } -func (*upstreamMock) Stop() { - -} +func (*upstreamMock) Stop() {} func (u *upstreamMock) Upload(j *upstream.UploadJob) { - u.tries = append(u.tries, j.Trie) + u.uploads = append(u.uploads, j) } var _ = Describe("agent.Session", func() { - testing.WithConfig(func(cfg **config.Config) { - Describe("NewSession", func() { - It("creates a new session and performs chunking", func(done Done) { + Describe("NewSession", func() { + It("creates a new session and performs chunking", func(done Done) { + u := &upstreamMock{} + uploadRate := 200 * time.Millisecond + s, _ := NewSession(&SessionConfig{ + Upstream: u, + AppName: "test-app", + ProfilingTypes: []spy.ProfileType{spy.ProfileCPU}, + SpyName: "debugspy", + SampleRate: 100, + UploadRate: uploadRate, + Pid: os.Getpid(), + WithSubprocesses: true, + }, logrus.StandardLogger()) + now := time.Now() + time.Sleep(now.Truncate(uploadRate).Add(uploadRate + 10*time.Millisecond).Sub(now)) + err := s.Start() + + Expect(err).ToNot(HaveOccurred()) + time.Sleep(500 * time.Millisecond) + s.Stop() + + Expect(u.uploads).To(HaveLen(3)) + u.uploads[0].Trie.Iterate(func(name []byte, val uint64) { + Expect(val).To(BeNumerically("~", 19, 2)) + }) + u.uploads[1].Trie.Iterate(func(name []byte, val uint64) { + Expect(val).To(BeNumerically("~", 20, 2)) + }) + u.uploads[2].Trie.Iterate(func(name []byte, val uint64) { + Expect(val).To(BeNumerically("~", 11, 2)) + }) + close(done) + }) + + When("tags specified", func() { + It("name ", func() { u := &upstreamMock{} uploadRate := 200 * time.Millisecond - s := NewSession(&SessionConfig{ + c := &SessionConfig{ Upstream: u, - AppName: "test-app", + AppName: "test-app{bar=xxx}", ProfilingTypes: []spy.ProfileType{spy.ProfileCPU}, SpyName: "debugspy", SampleRate: 100, UploadRate: uploadRate, Pid: os.Getpid(), WithSubprocesses: true, - }, logrus.StandardLogger()) + Tags: map[string]string{ + "foo": "bar", + "baz": "qux", + }, + } + + s, _ := NewSession(c, logrus.StandardLogger()) now := time.Now() time.Sleep(now.Truncate(uploadRate).Add(uploadRate + 10*time.Millisecond).Sub(now)) err := s.Start() - Expect(err).ToNot(HaveOccurred()) time.Sleep(500 * time.Millisecond) s.Stop() - Expect(u.tries).To(HaveLen(3)) - u.tries[0].Iterate(func(name []byte, val uint64) { - Expect(val).To(BeNumerically("~", 19, 2)) - }) - u.tries[1].Iterate(func(name []byte, val uint64) { - Expect(val).To(BeNumerically("~", 20, 2)) - }) - u.tries[2].Iterate(func(name []byte, val uint64) { - Expect(val).To(BeNumerically("~", 11, 2)) - }) - close(done) + Expect(u.uploads).To(HaveLen(3)) + Expect(u.uploads[0].Name).To(Equal("test-app.cpu{bar=xxx,baz=qux,foo=bar}")) }) }) }) diff --git a/pkg/agent/target/service.go b/pkg/agent/target/service.go index b01f908713..89a2bf376b 100644 --- a/pkg/agent/target/service.go +++ b/pkg/agent/target/service.go @@ -35,6 +35,7 @@ func newServiceTarget(logger *logrus.Logger, upstream *remote.Remote, t config.T sc: &agent.SessionConfig{ Upstream: upstream, AppName: t.ApplicationName, + Tags: t.Tags, ProfilingTypes: []spy.ProfileType{spy.ProfileCPU}, SpyName: t.SpyName, SampleRate: uint32(t.SampleRate), @@ -68,7 +69,10 @@ func (s *service) wait(ctx context.Context) error { pyspy.Blocking = s.target.PyspyBlocking rbspy.Blocking = s.target.RbspyBlocking - session := agent.NewSession(s.sc, s.logger) + session, err := agent.NewSession(s.sc, s.logger) + if err != nil { + return err + } if err := session.Start(); err != nil { return err } diff --git a/pkg/agent/upstream/direct/direct.go b/pkg/agent/upstream/direct/direct.go index 0a6ade8357..595052cad3 100644 --- a/pkg/agent/upstream/direct/direct.go +++ b/pkg/agent/upstream/direct/direct.go @@ -8,6 +8,7 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/agent/upstream" "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/storage/tree" ) @@ -51,7 +52,7 @@ func (u *Direct) Upload(j *upstream.UploadJob) { } func (u *Direct) uploadProfile(j *upstream.UploadJob) { - key, err := storage.ParseKey(j.Name) + key, err := segment.ParseKey(j.Name) if err != nil { logrus.WithField("key", key).Error("invalid key:") return diff --git a/pkg/cli/agent.go b/pkg/cli/agent.go index 9e0c015abc..1a7db68368 100644 --- a/pkg/cli/agent.go +++ b/pkg/cli/agent.go @@ -51,7 +51,10 @@ func (svc *agentService) Stop(_ service.Service) error { return nil } -func loadTargets(c *config.Agent) error { +// loadAgentConfig is a hack for ff parser, which can't parse maps, structs, +// and slices. The function to be called after the parser finishes just to +// fill missing configuration elements. +func loadAgentConfig(c *config.Agent) error { b, err := ioutil.ReadFile(c.Config) switch { case err == nil: @@ -64,10 +67,28 @@ func loadTargets(c *config.Agent) error { if err = yaml.Unmarshal(b, &a); err != nil { return err } - c.Targets = a.Targets + // Override tags from config file with flags. + c.Tags = mergeTags(a.Tags, c.Tags) + for _, t := range a.Targets { + t.Tags = mergeTags(t.Tags, c.Tags) + c.Targets = append(c.Targets, t) + } return nil } +// mergeTags creates a new map with tags from a and b. +// Values from b take precedence. Returned map is never nil. +func mergeTags(a, b map[string]string) map[string]string { + t := make(map[string]string, len(a)) + for k, v := range a { + t[k] = v + } + for k, v := range b { + t[k] = v + } + return t +} + func createLogger(config *config.Agent) (*logrus.Logger, error) { if config.NoLogging { logrus.SetOutput(ioutil.Discard) diff --git a/pkg/cli/agent_windows.go b/pkg/cli/agent_windows.go index d98d3ed191..269bf36d3a 100644 --- a/pkg/cli/agent_windows.go +++ b/pkg/cli/agent_windows.go @@ -13,7 +13,7 @@ func startAgent(config *config.Agent) error { return fmt.Errorf("could not create logger: %w", err) } logger.Info("starting pyroscope agent") - if err = loadTargets(config); err != nil { + if err = loadAgentConfig(config); err != nil { return fmt.Errorf("could not load targets: %w", err) } agent, err := newAgentService(logger, config) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 79d3941fc3..c2456f1ad6 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -140,7 +140,6 @@ func generateRootCmd(cfg *config.Config) *ffcli.Command { fmt.Println(DefaultUsageFunc(execSortedFlags, execCmd, []string{})) return nil } - err := exec.Cli(&cfg.Exec, args) // Normally, if the program ran, the call should return ExitError and // the exit code must be preserved. Otherwise, the error originates from @@ -163,7 +162,6 @@ func generateRootCmd(cfg *config.Config) *ffcli.Command { fmt.Println(DefaultUsageFunc(connectSortedFlags, connectCmd, []string{})) return nil } - return exec.Cli(&cfg.Exec, args) } diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index d1b8616d09..2016a57753 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -2,17 +2,19 @@ package cli import ( "flag" + "fmt" "reflect" "strconv" "strings" "time" "github.com/iancoleman/strcase" + "github.com/sirupsen/logrus" + "github.com/pyroscope-io/pyroscope/pkg/agent/spy" "github.com/pyroscope-io/pyroscope/pkg/util/bytesize" "github.com/pyroscope-io/pyroscope/pkg/util/duration" "github.com/pyroscope-io/pyroscope/pkg/util/slices" - "github.com/sirupsen/logrus" ) const timeFormat = "2006-01-02T15:04:05Z0700" @@ -56,6 +58,32 @@ func (tf *timeFlag) Set(value string) error { return nil } +type mapFlags map[string]string + +func (m mapFlags) String() string { + if len(m) == 0 { + return "{}" + } + // Cast to map to avoid recursion. + return fmt.Sprint((map[string]string)(m)) +} + +func (m *mapFlags) Set(s string) error { + if len(s) == 0 { + return nil + } + v := strings.Split(s, "=") + if len(v) != 2 { + return fmt.Errorf("invalid flag %s: should be in key=value format", s) + } + if *m == nil { + *m = map[string]string{v[0]: v[1]} + } else { + (*m)[v[0]] = v[1] + } + return nil +} + type options struct { replacements map[string]string skip []string @@ -162,6 +190,10 @@ func PopulateFlagSet(obj interface{}, flagSet *flag.FlagSet, opts ...FlagOption) val := fieldV.Addr().Interface().(*[]string) val2 := (*arrayFlags)(val) flagSet.Var(val2, nameVal, descVal) + case reflect.TypeOf(map[string]string{}): + val := fieldV.Addr().Interface().(*map[string]string) + val2 := (*mapFlags)(val) + flagSet.Var(val2, nameVal, descVal) case reflect.TypeOf(""): val := fieldV.Addr().Interface().(*string) for old, n := range o.replacements { diff --git a/pkg/cli/flags_test.go b/pkg/cli/flags_test.go index d930fdc5ca..3df782e931 100644 --- a/pkg/cli/flags_test.go +++ b/pkg/cli/flags_test.go @@ -137,6 +137,7 @@ var _ = Describe("flags", func() { err := exampleCommand.ParseAndRun(context.Background(), []string{ "-config", "testdata/agent.yml", + "-tag", "baz=zzz", }) Expect(err).ToNot(HaveOccurred()) @@ -148,18 +149,76 @@ var _ = Describe("flags", func() { AuthToken: "", UpstreamThreads: 4, UpstreamRequestTimeout: 10 * time.Second, + Tags: map[string]string{ + "baz": "zzz", + }, + })) + + Expect(loadAgentConfig(&cfg)).ToNot(HaveOccurred()) + Expect(cfg).To(Equal(config.Agent{ + Config: "testdata/agent.yml", + LogLevel: "debug", + NoLogging: false, + ServerAddress: "http://localhost:4040", + AuthToken: "", + UpstreamThreads: 4, + UpstreamRequestTimeout: 10 * time.Second, + Tags: map[string]string{ + "foo": "bar", + "baz": "zzz", + }, + Targets: []config.Target{ + { + ServiceName: "foo", + SpyName: "debugspy", + ApplicationName: "foo.app", + SampleRate: 0, + DetectSubprocesses: false, + PyspyBlocking: false, + RbspyBlocking: false, + Tags: map[string]string{ + "foo": "bar", + "baz": "zzz", + }, + }, + }, })) + }) + + It("parses tag flags in exec", func() { + exampleFlagSet := flag.NewFlagSet("example flag set", flag.ExitOnError) + var cfg config.Exec + PopulateFlagSet(&cfg, exampleFlagSet) + + exampleCommand := &ffcli.Command{ + FlagSet: exampleFlagSet, + Options: []ff.Option{ + ff.WithIgnoreUndefined(true), + ff.WithConfigFileParser(parser), + ff.WithConfigFileFlag("config"), + }, + Exec: func(_ context.Context, args []string) error { + return nil + }, + } + + err := exampleCommand.ParseAndRun(context.Background(), []string{ + "-tag", "foo=bar", + "-tag", "baz=qux", + }) - Expect(loadTargets(&cfg)).ToNot(HaveOccurred()) - Expect(cfg.Targets).To(Equal([]config.Target{ - { - ServiceName: "foo", - SpyName: "debugspy", - ApplicationName: "foo.app", - SampleRate: 0, - DetectSubprocesses: false, - PyspyBlocking: false, - RbspyBlocking: false, + Expect(err).ToNot(HaveOccurred()) + Expect(cfg).To(Equal(config.Exec{ + SpyName: "auto", + SampleRate: 100, + DetectSubprocesses: true, + LogLevel: "info", + ServerAddress: "http://localhost:4040", + UpstreamThreads: 4, + UpstreamRequestTimeout: 10 * time.Second, + Tags: map[string]string{ + "foo": "bar", + "baz": "qux", }, })) }) diff --git a/pkg/cli/server.go b/pkg/cli/server.go index 14253036ac..eb12c33ab0 100644 --- a/pkg/cli/server.go +++ b/pkg/cli/server.go @@ -61,7 +61,7 @@ func newServerService(logger *logrus.Logger, c *config.Server) (*serverService, SampleRate: 100, UploadRate: 10 * time.Second, } - svc.selfProfiling = agent.NewSession(selfProfilingConfig, svc.logger) + svc.selfProfiling, _ = agent.NewSession(selfProfilingConfig, svc.logger) if !c.AnalyticsOptOut { svc.analyticsService = analytics.NewService(c, svc.storage, svc.controller) } diff --git a/pkg/cli/testdata/agent.yml b/pkg/cli/testdata/agent.yml index 8d4a090e1b..487da2eb6b 100644 --- a/pkg/cli/testdata/agent.yml +++ b/pkg/cli/testdata/agent.yml @@ -5,3 +5,7 @@ targets: - service-name: foo application-name: foo.app spy-name: debugspy + +tags: + foo: bar + baz: qux diff --git a/pkg/config/config.go b/pkg/config/config.go index 0e048370f2..f98c44cd7c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -28,7 +28,12 @@ type Agent struct { UpstreamThreads int `def:"4" desc:"number of upload threads"` UpstreamRequestTimeout time.Duration `def:"10s" desc:"profile upload timeout"` - Targets []Target `desc:"list of targets to be profiled"` + // Structs and slices are not parsed with ffcli package, + // instead `loadAgentConfig` function should be used. + Targets []Target `yaml:"targets" desc:"list of targets to be profiled"` + + // Note that in YAML the key is 'tags' but the flag is 'tag'. + Tags map[string]string `yaml:"tags" name:"tag" def:"" desc:"tag key value pairs"` } type Target struct { @@ -40,9 +45,12 @@ type Target struct { DetectSubprocesses bool `yaml:"detect-subprocesses" def:"true" desc:"makes pyroscope keep track of and profile subprocesses of the main process"` // Spy-specific settings. - PyspyBlocking bool `yaml:"pyspy-blocking" def:"false" desc:"enables blocking mode for pyspy"` RbspyBlocking bool `yaml:"rbspy-blocking" def:"false" desc:"enables blocking mode for rbspy"` + + // Tags are inherited from the agent level. At some point we may need + // specifying tags at the target level (override). + Tags map[string]string `yaml:"-"` } type Server struct { @@ -142,4 +150,6 @@ type Exec struct { GroupName string `def:"" desc:"starts process under specified group name"` PyspyBlocking bool `def:"false" desc:"enables blocking mode for pyspy"` RbspyBlocking bool `def:"false" desc:"enables blocking mode for rbspy"` + + Tags map[string]string `name:"tag" def:"" desc:"tag in key=value form. The flag may be specified multiple times"` } diff --git a/pkg/dbmanager/cli.go b/pkg/dbmanager/cli.go index 705d9eaa9c..4917994cb0 100644 --- a/pkg/dbmanager/cli.go +++ b/pkg/dbmanager/cli.go @@ -15,6 +15,7 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/agent/upstream/direct" "github.com/pyroscope-io/pyroscope/pkg/config" "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" ) func Cli(dbCfg *config.DbManager, srvCfg *config.Server, args []string) error { @@ -78,12 +79,12 @@ func copyData(dbCfg *config.DbManager, srvCfg *config.Server) error { SampleRate: 100, UploadRate: 10 * time.Second, } - session := agent.NewSession(selfProfilingConfig, logrus.StandardLogger()) + session, _ := agent.NewSession(selfProfilingConfig, logrus.StandardLogger()) upstream.Start() _ = session.Start() } - sk, err := storage.ParseKey(appName) + sk, err := segment.ParseKey(appName) if err != nil { return err } diff --git a/pkg/exec/cli.go b/pkg/exec/cli.go index 30dfb74e27..934785eaa6 100644 --- a/pkg/exec/cli.go +++ b/pkg/exec/cli.go @@ -150,6 +150,7 @@ func Cli(cfg *config.Exec, args []string) error { sc := agent.SessionConfig{ Upstream: u, AppName: cfg.ApplicationName, + Tags: cfg.Tags, ProfilingTypes: []spy.ProfileType{spy.ProfileCPU}, SpyName: spyName, SampleRate: uint32(cfg.SampleRate), @@ -157,9 +158,12 @@ func Cli(cfg *config.Exec, args []string) error { Pid: pid, WithSubprocesses: cfg.DetectSubprocesses, } - session := agent.NewSession(&sc, logrus.StandardLogger()) + session, err := agent.NewSession(&sc, logrus.StandardLogger()) + if err != nil { + return fmt.Errorf("new session: %w", err) + } if err = session.Start(); err != nil { - return fmt.Errorf("start session: %v", err) + return fmt.Errorf("start session: %w", err) } defer session.Stop() diff --git a/pkg/flameql/error.go b/pkg/flameql/error.go new file mode 100644 index 0000000000..b833af1b8a --- /dev/null +++ b/pkg/flameql/error.go @@ -0,0 +1,37 @@ +package flameql + +import ( + "errors" + "fmt" +) + +var ( + ErrInvalidQuerySyntax = errors.New("invalid query syntax") + ErrInvalidAppName = errors.New("invalid application name") + ErrInvalidMatchersSyntax = errors.New("invalid tag matchers syntax") + ErrInvalidTagKey = errors.New("invalid tag key") + ErrInvalidTagValueSyntax = errors.New("invalid tag value syntax") + + ErrAppNameIsRequired = errors.New("application name is required") + ErrTagKeyIsRequired = errors.New("tag key is required") + ErrTagKeyReserved = errors.New("tag key is reserved") + + ErrMatchOperatorIsRequired = errors.New("match operator is required") + ErrUnknownOp = errors.New("unknown tag match operator") +) + +type Error struct { + Inner error + Expr string + // TODO: add offset? +} + +func newErr(err error, expr string) *Error { return &Error{Inner: err, Expr: expr} } + +func (e *Error) Error() string { return e.Inner.Error() + ": " + e.Expr } + +func (e *Error) Unwrap() error { return e.Inner } + +func newInvalidTagKeyRuneError(k string, r rune) *Error { + return newErr(ErrInvalidTagKey, fmt.Sprintf("%s: character is not allowed: %q", k, r)) +} diff --git a/pkg/flameql/parser.go b/pkg/flameql/parser.go new file mode 100644 index 0000000000..b82d9d366f --- /dev/null +++ b/pkg/flameql/parser.go @@ -0,0 +1,199 @@ +package flameql + +import ( + "regexp" + "strings" +) + +type Query struct { + AppName string + Matchers []*TagMatcher + + q string // The original query string. +} + +func (q *Query) String() string { return q.q } + +type TagMatcher struct { + Key string + Value string + Op + + R *regexp.Regexp +} + +type Op int + +const ( + _ Op = iota + EQL // = + NEQ // != + EQL_REGEX // =~ + NEQ_REGEX // !~ +) + +// ParseQuery parses a string of $app_name<{<$tag_matchers>}> form. +func ParseQuery(s string) (*Query, error) { + s = strings.TrimSpace(s) + q := Query{q: s} + + for offset, c := range s { + switch c { + case '{': + if offset == 0 { + return nil, ErrAppNameIsRequired + } + if s[len(s)-1] != '}' { + return nil, newErr(ErrInvalidQuerySyntax, "expected } at the end") + } + m, err := ParseMatchers(s[offset+1 : len(s)-1]) + if err != nil { + return nil, err + } + q.AppName = s[:offset] + q.Matchers = m + return &q, nil + default: + if !IsTagKeyRuneAllowed(c) { + return nil, newErr(ErrInvalidAppName, s[:offset+1]) + } + } + } + + if len(s) == 0 { + return nil, ErrAppNameIsRequired + } + + q.AppName = s + return &q, nil +} + +// ParseMatchers parses a string of $tag_matcher<,$tag_matchers> form. +func ParseMatchers(s string) ([]*TagMatcher, error) { + var matchers []*TagMatcher + for _, t := range split(s) { + if t == "" { + continue + } + m, err := ParseMatcher(strings.TrimSpace(t)) + if err != nil { + return nil, err + } + matchers = append(matchers, m) + } + if len(matchers) == 0 && len(s) != 0 { + return nil, newErr(ErrInvalidMatchersSyntax, s) + } + return matchers, nil +} + +// ParseMatcher parses a string of $tag_key$op"$tag_value" form, +// where $op is one of the supported match operators. +func ParseMatcher(s string) (*TagMatcher, error) { + var tm TagMatcher + var offset int + var c rune + +loop: + for offset, c = range s { + r := len(s) - (offset + 1) + switch c { + case '=': + switch { + case r <= 2: + return nil, newErr(ErrInvalidTagValueSyntax, s) + case s[offset+1] == '"': + tm.Op = EQL + case s[offset+1] == '~': + if r <= 3 { + return nil, newErr(ErrInvalidTagValueSyntax, s) + } + tm.Op = EQL_REGEX + default: + // Just for more meaningful error message. + if s[offset+2] != '"' { + return nil, newErr(ErrInvalidTagValueSyntax, s) + } + return nil, newErr(ErrUnknownOp, s) + } + break loop + case '!': + if r <= 3 { + return nil, newErr(ErrInvalidTagValueSyntax, s) + } + switch s[offset+1] { + case '=': + tm.Op = NEQ + case '~': + tm.Op = NEQ_REGEX + default: + return nil, newErr(ErrUnknownOp, s) + } + break loop + default: + if !IsTagKeyRuneAllowed(c) { + return nil, newInvalidTagKeyRuneError(s, c) + } + } + } + + k := s[:offset] + if IsTagKeyReserved(k) { + return nil, newErr(ErrTagKeyReserved, k) + } + + var v string + var ok bool + switch tm.Op { + default: + return nil, newErr(ErrMatchOperatorIsRequired, s) + case EQL: + v, ok = unquote(s[offset+1:]) + case NEQ, EQL_REGEX, NEQ_REGEX: + v, ok = unquote(s[offset+2:]) + } + if !ok { + return nil, newErr(ErrInvalidTagValueSyntax, v) + } + + // Compile regex, if applicable. + switch tm.Op { + case EQL_REGEX, NEQ_REGEX: + r, err := regexp.Compile(v) + if err != nil { + return nil, newErr(err, v) + } + tm.R = r + } + + tm.Key = k + tm.Value = v + return &tm, nil +} + +func unquote(s string) (string, bool) { + if s[0] != '"' || s[len(s)-1] != '"' { + return s, false + } + return s[1 : len(s)-1], true +} + +func split(s string) []string { + var r []string + var x int + var y bool + for i := 0; i < len(s); i++ { + switch { + case s[i] == ',' && !y: + r = append(r, s[x:i]) + x = i + 1 + case s[i] == '"': + if y && i > 0 && s[i-1] != '\\' { + y = false + continue + } + y = true + } + } + return append(r, s[x:]) +} diff --git a/pkg/flameql/parser_suite_test.go b/pkg/flameql/parser_suite_test.go new file mode 100644 index 0000000000..dfee75a056 --- /dev/null +++ b/pkg/flameql/parser_suite_test.go @@ -0,0 +1,13 @@ +package flameql_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestParser(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Parser Suite") +} diff --git a/pkg/flameql/parser_test.go b/pkg/flameql/parser_test.go new file mode 100644 index 0000000000..a8a73aa3b0 --- /dev/null +++ b/pkg/flameql/parser_test.go @@ -0,0 +1,133 @@ +package flameql + +import ( + "errors" + "regexp/syntax" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("ParseQuery", func() { + It("parses queries", func() { + type testCase struct { + query string + err error + q *Query + } + + testCases := []testCase{ + {`app.name`, nil, &Query{AppName: "app.name", q: `app.name`}}, + {`app.name{}`, nil, &Query{AppName: "app.name", q: `app.name{}`}}, + {`app.name{foo="bar"}`, nil, + &Query{"app.name", []*TagMatcher{{"foo", "bar", EQL, nil}}, `app.name{foo="bar"}`}}, + {`app.name{foo="bar,baz"}`, nil, + &Query{"app.name", []*TagMatcher{{"foo", "bar,baz", EQL, nil}}, `app.name{foo="bar,baz"}`}}, + {`app.name{foo="bar",baz!="quo"}`, nil, + &Query{"app.name", []*TagMatcher{{"foo", "bar", EQL, nil}, {"baz", "quo", NEQ, nil}}, `app.name{foo="bar",baz!="quo"}`}}, + + {"", ErrAppNameIsRequired, nil}, + {"{}", ErrAppNameIsRequired, nil}, + {`app.name{,}`, ErrInvalidMatchersSyntax, nil}, + {`app.name[foo="bar"]`, ErrInvalidAppName, nil}, + {`app=name{}"`, ErrInvalidAppName, nil}, + {`app.name{foo="bar"`, ErrInvalidQuerySyntax, nil}, + {`app.name{__name__="foo"}`, ErrTagKeyReserved, nil}, + } + + for _, tc := range testCases { + q, err := ParseQuery(tc.query) + if tc.err != nil { + Expect(errors.Is(err, tc.err)).To(BeTrue()) + } else { + Expect(err).To(BeNil()) + } + Expect(q).To(Equal(tc.q)) + } + }) +}) + +var _ = Describe("ParseMatcher", func() { + It("parses tag matchers", func() { + type testCase struct { + expr string + err error + m *TagMatcher + } + + testCases := []testCase{ + {expr: `foo="bar"`, m: &TagMatcher{"foo", "bar", EQL, nil}}, + {expr: `foo="z"`, m: &TagMatcher{"foo", "z", EQL, nil}}, + {expr: `foo=""`, err: ErrInvalidTagValueSyntax}, + {expr: `foo="`, err: ErrInvalidTagValueSyntax}, + {expr: `foo="z`, err: ErrInvalidTagValueSyntax}, + {expr: `foo=`, err: ErrInvalidTagValueSyntax}, + + {expr: `foo!="bar"`, m: &TagMatcher{"foo", "bar", NEQ, nil}}, + {expr: `foo!="z"`, m: &TagMatcher{"foo", "z", NEQ, nil}}, + {expr: `foo=~""`, err: ErrInvalidTagValueSyntax}, + {expr: `foo=~"`, err: ErrInvalidTagValueSyntax}, + {expr: `foo=~"z`, err: ErrInvalidTagValueSyntax}, + {expr: `foo=~`, err: ErrInvalidTagValueSyntax}, + + {expr: `foo=~"bar"`, m: &TagMatcher{"foo", "bar", EQL_REGEX, nil}}, + {expr: `foo=~"z"`, m: &TagMatcher{"foo", "z", EQL_REGEX, nil}}, + {expr: `foo!=""`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!="`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!="z`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!=`, err: ErrInvalidTagValueSyntax}, + + {expr: `foo!~"bar"`, m: &TagMatcher{"foo", "bar", NEQ_REGEX, nil}}, + {expr: `foo!~"z"`, m: &TagMatcher{"foo", "z", NEQ_REGEX, nil}}, + {expr: `foo!~""`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!~"`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!~"z`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!~`, err: ErrInvalidTagValueSyntax}, + + {expr: `foo="bar,baz"`, m: &TagMatcher{"foo", "bar,baz", EQL, nil}}, + {expr: `foo="bar\",\"baz"`, m: &TagMatcher{"foo", "bar\\\",\\\"baz", EQL, nil}}, + + {expr: `foo;bar="baz"`, err: ErrInvalidTagKey}, + {expr: `foo""`, err: ErrInvalidTagKey}, + {expr: `foo`, err: ErrMatchOperatorIsRequired}, + {expr: `foo!`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!!`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!~@b@"`, err: ErrInvalidTagValueSyntax}, + {expr: `foo=bar`, err: ErrInvalidTagValueSyntax}, + {expr: `foo!"bar"`, err: ErrUnknownOp}, + {expr: `foo!!"bar"`, err: ErrUnknownOp}, + {expr: `foo=="bar"`, err: ErrUnknownOp}, + } + + for _, tc := range testCases { + tm, err := ParseMatcher(tc.expr) + if tc.err != nil { + Expect(errors.Is(err, tc.err)).To(BeTrue()) + } else { + Expect(err).To(BeNil()) + } + if tm != nil { + tm.R = nil + } + Expect(tm).To(Equal(tc.m)) + } + }) +}) + +var _ = Describe("ParseMatcherRegex", func() { + It("parses tag matchers with regex", func() { + m, err := ParseMatcher(`foo=~".*_suffix"`) + Expect(err).To(BeNil()) + Expect(m).ToNot(BeNil()) + + m, err = ParseMatcher(`foo=~"["`) + Expect(m).To(BeNil()) + Expect(err).ToNot(BeNil()) + + var e1 *syntax.Error + Expect(errors.As(err, &e1)).To(BeTrue()) + + var e2 *Error + Expect(errors.As(err, &e2)).To(BeTrue()) + }) +}) diff --git a/pkg/flameql/tag_key.go b/pkg/flameql/tag_key.go new file mode 100644 index 0000000000..1a4f574af6 --- /dev/null +++ b/pkg/flameql/tag_key.go @@ -0,0 +1,46 @@ +package flameql + +import ( + "unicode" +) + +const ( + ReservedTagKeyName = "__name__" +) + +var reservedTagKeys = []string{ + ReservedTagKeyName, +} + +// ValidateTagKey report an error if the given key k violates constraints. +// +// The function should be used to validate user input. The function returns +// ErrTagKeyReserved if the key is valid but reserved for internal use. +func ValidateTagKey(k string) error { + if len(k) == 0 { + return ErrTagKeyIsRequired + } + for _, r := range k { + if !IsTagKeyRuneAllowed(r) { + return newInvalidTagKeyRuneError(k, r) + } + } + if IsTagKeyReserved(k) { + return newErr(ErrTagKeyReserved, k) + } + return nil +} + +func IsTagKeyRuneAllowed(r rune) bool { + return unicode.IsDigit(r) || unicode.IsLetter(r) || + r == '-' || r == '_' || r == '.' || r == '/' +} + +func IsTagKeyReserved(k string) bool { + for _, s := range reservedTagKeys { + if s == k { + return true + } + } + return false +} diff --git a/pkg/flameql/tag_key_test.go b/pkg/flameql/tag_key_test.go new file mode 100644 index 0000000000..2af1cc999c --- /dev/null +++ b/pkg/flameql/tag_key_test.go @@ -0,0 +1,34 @@ +package flameql + +import ( + "errors" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("ValidateTagKey", func() { + It("reports error if a key violates constraints", func() { + type testCase struct { + key string + err error + } + + testCases := []testCase{ + {"foo/BAR.1-2.baz_qux", nil}, + + {ReservedTagKeyName, ErrTagKeyReserved}, + {"", ErrTagKeyIsRequired}, + {"#", ErrInvalidTagKey}, + } + + for _, tc := range testCases { + err := ValidateTagKey(tc.key) + if tc.err != nil { + Expect(errors.Is(err, tc.err)).To(BeTrue()) + continue + } + Expect(err).To(BeNil()) + } + }) +}) diff --git a/pkg/server/build.go b/pkg/server/build.go index d295008ebb..d1e03077ef 100644 --- a/pkg/server/build.go +++ b/pkg/server/build.go @@ -6,7 +6,7 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/build" ) -func (ctrl *Controller) buildHandler(w http.ResponseWriter, r *http.Request) { +func (ctrl *Controller) buildHandler(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") - w.Write([]byte(build.PrettyJSON())) + _, _ = w.Write([]byte(build.PrettyJSON())) } diff --git a/pkg/server/build_test.go b/pkg/server/build_test.go index 15a33bbda8..575c49977e 100644 --- a/pkg/server/build_test.go +++ b/pkg/server/build_test.go @@ -28,7 +28,8 @@ var _ = Describe("server", func() { s, err := storage.New(&(*cfg).Server) Expect(err).ToNot(HaveOccurred()) c, _ := New(&(*cfg).Server, s, logrus.New()) - httpServer := httptest.NewServer(c.mux()) + h, _ := c.mux() + httpServer := httptest.NewServer(h) defer httpServer.Close() res, err := http.Get(httpServer.URL + "/build") diff --git a/pkg/server/config.go b/pkg/server/config.go index 09ca92f937..8e98a4f7e6 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -2,16 +2,15 @@ package server import ( "encoding/json" - "fmt" "net/http" ) -func (ctrl *Controller) configHandler(w http.ResponseWriter, r *http.Request) { +func (ctrl *Controller) configHandler(w http.ResponseWriter, _ *http.Request) { configBytes, err := json.MarshalIndent(ctrl.config, "", " ") if err != nil { - renderServerError(w, fmt.Sprintf("could not marshal buildInfoObj json: %q", err)) + ctrl.writeJSONEncodeError(w, err) return } w.Header().Set("Content-Type", "application/json") - w.Write(configBytes) + _, _ = w.Write(configBytes) } diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index aae60dda1e..34c4199e24 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -27,7 +27,8 @@ var _ = Describe("server", func() { s, err := storage.New(&(*cfg).Server) Expect(err).ToNot(HaveOccurred()) c, _ := New(&(*cfg).Server, s, logrus.New()) - httpServer := httptest.NewServer(c.mux()) + h, _ := c.mux() + httpServer := httptest.NewServer(h) defer httpServer.Close() res, err := http.Get(httpServer.URL + "/config") diff --git a/pkg/server/controller.go b/pkg/server/controller.go index ad4f2262fd..c1ad4ede83 100644 --- a/pkg/server/controller.go +++ b/pkg/server/controller.go @@ -2,6 +2,7 @@ package server import ( "context" + "errors" "fmt" golog "log" "net/http" @@ -78,28 +79,37 @@ func (ctrl *Controller) assetsFilesHandler(w http.ResponseWriter, r *http.Reques fs.ServeHTTP(w, r) } -func (ctrl *Controller) mux() http.Handler { +func (ctrl *Controller) mux() (http.Handler, error) { mux := http.NewServeMux() - addRoutes(mux, []route{ - {"/healthz", ctrl.healthz}, - {"/metrics", promhttp.Handler().ServeHTTP}, - {"/config", ctrl.configHandler}, - {"/build", ctrl.buildHandler}, - }) - // auth routes - addRoutes(mux, ctrl.getAuthRoutes(), ctrl.drainMiddleware) + // Routes not protected with auth. Drained at shutdown. + insecureRoutes, err := ctrl.getAuthRoutes() + if err != nil { + return nil, err + } + insecureRoutes = append(insecureRoutes, []route{ + {"/ingest", ctrl.ingestHandler}, + {"/forbidden", ctrl.forbiddenHandler()}, + {"/assets/", ctrl.assetsFilesHandler}, + }...) + addRoutes(mux, insecureRoutes, ctrl.drainMiddleware) - // drainable routes: + // Protected routes: routes := []route{ {"/", ctrl.indexHandler()}, {"/render", ctrl.renderHandler}, {"/labels", ctrl.labelsHandler}, {"/label-values", ctrl.labelValuesHandler}, } - addRoutes(mux, routes, ctrl.drainMiddleware, ctrl.authMiddleware) + // Diagnostic routes: not protected with auth, not drained. + addRoutes(mux, []route{ + {"/healthz", ctrl.healthz}, + {"/metrics", promhttp.Handler().ServeHTTP}, + {"/config", ctrl.configHandler}, + {"/build", ctrl.buildHandler}, + }) if !ctrl.config.DisablePprofEndpoint { addRoutes(mux, []route{ {"/debug/pprof/", pprof.Index}, @@ -110,14 +120,7 @@ func (ctrl *Controller) mux() http.Handler { }) } - nonAuthRoutes := []route{ - {"/ingest", ctrl.ingestHandler}, - {"/forbidden", ctrl.forbiddenHandler()}, - {"/assets/", ctrl.assetsFilesHandler}, - } - - addRoutes(mux, nonAuthRoutes, ctrl.drainMiddleware) - return mux + return mux, nil } type oauthInfo struct { @@ -180,7 +183,7 @@ func (ctrl *Controller) generateOauthInfo(oauthType int) *oauthInfo { return nil } -func (ctrl *Controller) getAuthRoutes() []route { +func (ctrl *Controller) getAuthRoutes() ([]route, error) { authRoutes := []route{ {"/login", ctrl.loginHandler()}, {"/logout", ctrl.logoutHandler()}, @@ -189,11 +192,11 @@ func (ctrl *Controller) getAuthRoutes() []route { if ctrl.config.GoogleEnabled { authURL, err := url.Parse(ctrl.config.GoogleAuthURL) if err != nil { - ctrl.log.WithError(err).Error("Problem parsing google auth url") + return nil, err } googleOauthInfo := ctrl.generateOauthInfo(oauthGoogle) - if err == nil && googleOauthInfo != nil { + if googleOauthInfo != nil { googleOauthInfo.AuthURL = authURL authRoutes = append(authRoutes, []route{ {"/auth/google/login", ctrl.oauthLoginHandler(googleOauthInfo)}, @@ -207,12 +210,11 @@ func (ctrl *Controller) getAuthRoutes() []route { if ctrl.config.GithubEnabled { authURL, err := url.Parse(ctrl.config.GithubAuthURL) if err != nil { - ctrl.log.WithError(err).Error("Problem parsing github auth url") - return nil + return nil, err } githubOauthInfo := ctrl.generateOauthInfo(oauthGithub) - if err == nil && githubOauthInfo != nil { + if githubOauthInfo != nil { githubOauthInfo.AuthURL = authURL authRoutes = append(authRoutes, []route{ {"/auth/github/login", ctrl.oauthLoginHandler(githubOauthInfo)}, @@ -225,12 +227,11 @@ func (ctrl *Controller) getAuthRoutes() []route { if ctrl.config.GitlabEnabled { authURL, err := url.Parse(ctrl.config.GitlabAuthURL) if err != nil { - ctrl.log.WithError(err).Error("Problem parsing gitlab auth url") - return nil + return nil, err } gitlabOauthInfo := ctrl.generateOauthInfo(oauthGitlab) - if err == nil && gitlabOauthInfo != nil { + if gitlabOauthInfo != nil { gitlabOauthInfo.AuthURL = authURL authRoutes = append(authRoutes, []route{ {"/auth/gitlab/login", ctrl.oauthLoginHandler(gitlabOauthInfo)}, @@ -240,7 +241,7 @@ func (ctrl *Controller) getAuthRoutes() []route { } } - return authRoutes + return authRoutes, nil } func (ctrl *Controller) Start() error { @@ -248,9 +249,14 @@ func (ctrl *Controller) Start() error { w := logger.Writer() defer w.Close() + handler, err := ctrl.mux() + if err != nil { + return err + } + ctrl.httpServer = &http.Server{ Addr: ctrl.config.APIBindAddr, - Handler: ctrl.mux(), + Handler: handler, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 30 * time.Second, @@ -260,11 +266,11 @@ func (ctrl *Controller) Start() error { // ListenAndServe always returns a non-nil error. After Shutdown or Close, // the returned error is ErrServerClosed. - err := ctrl.httpServer.ListenAndServe() - if err == http.ErrServerClosed { + err = ctrl.httpServer.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { return nil } - return fmt.Errorf("listen and serve: %v", err) + return err } func (ctrl *Controller) Stop() error { @@ -316,7 +322,7 @@ func (ctrl *Controller) authMiddleware(next http.HandlerFunc) http.HandlerFunc { }) if err != nil { - ctrl.log.WithError(err).Error("parsing jwt token") + ctrl.log.WithError(err).Error("invalid jwt token") http.Redirect(w, r, "/login", http.StatusTemporaryRedirect) return } @@ -324,3 +330,31 @@ func (ctrl *Controller) authMiddleware(next http.HandlerFunc) http.HandlerFunc { next.ServeHTTP(w, r) } } + +func (ctrl *Controller) writeInvalidParameterError(w http.ResponseWriter, err error) { + ctrl.writeError(w, http.StatusBadRequest, err, "invalid parameter") +} + +func (ctrl *Controller) writeInternalServerError(w http.ResponseWriter, err error, msg string) { + ctrl.writeError(w, http.StatusInternalServerError, err, msg) +} + +func (ctrl *Controller) writeJSONEncodeError(w http.ResponseWriter, err error) { + ctrl.writeInternalServerError(w, err, "encoding response body") +} + +func (ctrl *Controller) writeError(w http.ResponseWriter, code int, err error, msg string) { + logrus.WithError(err).Error(msg) + writeMessage(w, code, "%s: %q", msg, err) +} + +func (ctrl *Controller) writeErrorMessage(w http.ResponseWriter, code int, msg string) { + logrus.Error(msg) + writeMessage(w, code, msg) +} + +func writeMessage(w http.ResponseWriter, code int, format string, args ...interface{}) { + w.WriteHeader(code) + _, _ = fmt.Fprintf(w, format, args...) + _, _ = fmt.Fprintln(w) +} diff --git a/pkg/server/handler.go b/pkg/server/handler.go index 2ff6dc37b9..217f66c1b6 100644 --- a/pkg/server/handler.go +++ b/pkg/server/handler.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -15,42 +16,40 @@ import ( "time" "github.com/dgrijalva/jwt-go" - "github.com/pyroscope-io/pyroscope/pkg/build" "github.com/sirupsen/logrus" + + "github.com/pyroscope-io/pyroscope/pkg/build" ) func (ctrl *Controller) loginHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - tmplt, err := ctrl.getTemplate("/login.html") + tmpl, err := ctrl.getTemplate("/login.html") if err != nil { - renderServerError(w, err.Error()) + ctrl.writeInternalServerError(w, err, "could not render login page") return } - params := map[string]interface{}{ + mustExecute(tmpl, w, map[string]interface{}{ "GoogleEnabled": ctrl.config.GoogleEnabled, "GithubEnabled": ctrl.config.GithubEnabled, "GitlabEnabled": ctrl.config.GitlabEnabled, "BaseURL": ctrl.config.BaseURL, - } - - tmplt.Execute(w, params) + }) } } func createCookie(w http.ResponseWriter, name, value string) { - cookie := &http.Cookie{ + http.SetCookie(w, &http.Cookie{ Name: name, Path: "/", Value: value, HttpOnly: true, MaxAge: 0, SameSite: http.SameSiteStrictMode, - } - http.SetCookie(w, cookie) + }) } func invalidateCookie(w http.ResponseWriter, name string) { - cookie := &http.Cookie{ + http.SetCookie(w, &http.Cookie{ Name: name, Path: "/", Value: "", @@ -58,15 +57,13 @@ func invalidateCookie(w http.ResponseWriter, name string) { // MaxAge -1 request cookie be deleted immediately MaxAge: -1, SameSite: http.SameSiteStrictMode, - } - - http.SetCookie(w, cookie) + }) } func (ctrl *Controller) logoutHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" && r.Method != "DELETE" { - renderServerError(w, "you can only logout via a POST or DELETE") + ctrl.writeErrorMessage(w, http.StatusMethodNotAllowed, "only POST and DELETE are allowed") return } invalidateCookie(w, jwtCookieName) @@ -144,30 +141,26 @@ func (ctrl *Controller) oauthLoginHandler(info *oauthInfo) http.HandlerFunc { // this is done so that the state cookie would be send back from browser func (ctrl *Controller) callbackHandler(redirectURL string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - tmplt, err := ctrl.getTemplate("/redirect.html") + tmpl, err := ctrl.getTemplate("/redirect.html") if err != nil { - renderServerError(w, err.Error()) + ctrl.writeInternalServerError(w, err, "could not render redirect page") return } - - params := map[string]interface{}{ + mustExecute(tmpl, w, map[string]interface{}{ "RedirectURL": redirectURL + "?" + r.URL.RawQuery, "BaseURL": ctrl.config.BaseURL, - } - - tmplt.Execute(w, params) + }) } } func (ctrl *Controller) forbiddenHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - tmplt, err := ctrl.getTemplate("/forbidden.html") + tmpl, err := ctrl.getTemplate("/forbidden.html") if err != nil { - renderServerError(w, err.Error()) + ctrl.writeInternalServerError(w, err, "could not render forbidden page") return } - - tmplt.Execute(w, map[string]interface{}{ + mustExecute(tmpl, w, map[string]interface{}{ "BaseURL": ctrl.config.BaseURL, }) } @@ -235,59 +228,43 @@ func (ctrl *Controller) newJWTToken(name string) (string, error) { } jwtToken := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) - tk, err := jwtToken.SignedString([]byte(ctrl.config.JWTSecret)) - if err != nil { - return "", err - } - - return tk, nil + return jwtToken.SignedString([]byte(ctrl.config.JWTSecret)) } -func (ctrl *Controller) logErrorAndRedirect(w http.ResponseWriter, r *http.Request, logString string, err error) { +func (ctrl *Controller) logErrorAndRedirect(w http.ResponseWriter, r *http.Request, msg string, err error) { if err != nil { - ctrl.log.WithError(err).Error(logString) + ctrl.log.WithError(err).Error(msg) } else { - ctrl.log.Error(logString) + ctrl.log.Error(msg) } - invalidateCookie(w, stateCookieName) - http.Redirect(w, r, "/forbidden", http.StatusTemporaryRedirect) - return } func (ctrl *Controller) callbackRedirectHandler(getAccountInfoURL string, info *oauthInfo, decodeResponse decodeResponseFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - callbackURL, err := getCallbackURL(r.Host, info.Config.RedirectURL, info.Type, r.URL.Query().Get("tls") == "true") - if err != nil { - ctrl.logErrorAndRedirect(w, r, "callbackURL parsing failed", nil) - ctrl.log.WithError(err).Error("") - return - } - - oauthConf := *info.Config - oauthConf.RedirectURL = callbackURL - cookie, err := r.Cookie(stateCookieName) if err != nil { ctrl.logErrorAndRedirect(w, r, "missing state cookie", err) return } - - cookieState := cookie.Value - requestState := r.FormValue("state") - - if requestState != cookieState { + if cookie.Value != r.FormValue("state") { ctrl.logErrorAndRedirect(w, r, "invalid oauth state", nil) return } - code := r.FormValue("code") if code == "" { ctrl.logErrorAndRedirect(w, r, "code not found", nil) return } + callbackURL, err := getCallbackURL(r.Host, info.Config.RedirectURL, info.Type, r.URL.Query().Get("tls") == "true") + if err != nil { + ctrl.logErrorAndRedirect(w, r, "callbackURL parsing failed", nil) + return + } + oauthConf := *info.Config + oauthConf.RedirectURL = callbackURL token, err := oauthConf.Exchange(r.Context(), code) if err != nil { ctrl.logErrorAndRedirect(w, r, "exchanging auth code for token failed", err) @@ -318,19 +295,16 @@ func (ctrl *Controller) callbackRedirectHandler(getAccountInfoURL string, info * invalidateCookie(w, stateCookieName) createCookie(w, jwtCookieName, tk) - tmplt, err := ctrl.getTemplate("/welcome.html") + tmpl, err := ctrl.getTemplate("/welcome.html") if err != nil { - renderServerError(w, err.Error()) + ctrl.writeInternalServerError(w, err, "could not render welcome page") return } - params := map[string]interface{}{ + mustExecute(tmpl, w, map[string]interface{}{ "Name": name, "BaseURL": ctrl.config.BaseURL, - } - - tmplt.Execute(w, params) - return + }) } } @@ -349,21 +323,9 @@ func (ctrl *Controller) indexHandler() http.HandlerFunc { } } -func renderServerError(rw http.ResponseWriter, text string) { - rw.WriteHeader(500) - rw.Write([]byte(text)) - rw.Write([]byte("\n")) -} - type indexPageJSON struct { AppNames []string `json:"appNames"` } -type indexPage struct { - InitialState string - BuildInfo string - ExtraMetadata string - BaseURL string -} func (ctrl *Controller) getTemplate(path string) (*template.Template, error) { f, err := ctrl.dir.Open(path) @@ -383,11 +345,10 @@ func (ctrl *Controller) getTemplate(path string) (*template.Template, error) { return tmpl, nil } -func (ctrl *Controller) renderIndexPage(rw http.ResponseWriter, _ *http.Request) { - var b []byte +func (ctrl *Controller) renderIndexPage(w http.ResponseWriter, _ *http.Request) { tmpl, err := ctrl.getTemplate("/index.html") if err != nil { - renderServerError(rw, err.Error()) + ctrl.writeInternalServerError(w, err, "could not render index page") return } @@ -396,13 +357,15 @@ func (ctrl *Controller) renderIndexPage(rw http.ResponseWriter, _ *http.Request) initialStateObj.AppNames = append(initialStateObj.AppNames, v) return true }) + + var b []byte b, err = json.Marshal(initialStateObj) if err != nil { - renderServerError(rw, fmt.Sprintf("could not marshal initialStateObj json: %q", err)) + ctrl.writeJSONEncodeError(w, err) return } - initialStateStr := string(b) + initialStateStr := string(b) var extraMetadataStr string extraMetadataPath := os.Getenv("PYROSCOPE_EXTRA_METADATA") if extraMetadataPath != "" { @@ -413,15 +376,17 @@ func (ctrl *Controller) renderIndexPage(rw http.ResponseWriter, _ *http.Request) extraMetadataStr = string(b) } - rw.Header().Add("Content-Type", "text/html") - err = tmpl.Execute(rw, indexPage{ - InitialState: initialStateStr, - BuildInfo: build.JSON(), - ExtraMetadata: extraMetadataStr, - BaseURL: ctrl.config.BaseURL, + w.Header().Add("Content-Type", "text/html") + mustExecute(tmpl, w, map[string]string{ + "InitialState": initialStateStr, + "BuildInfo": build.JSON(), + "ExtraMetadata": extraMetadataStr, + "BaseURL": ctrl.config.BaseURL, }) - if err != nil { - renderServerError(rw, fmt.Sprintf("could not marshal json: %q", err)) - return +} + +func mustExecute(t *template.Template, w io.Writer, v interface{}) { + if err := t.Execute(w, v); err != nil { + panic(err) } } diff --git a/pkg/server/healthz.go b/pkg/server/healthz.go index 5b9bca3317..00c6062b37 100644 --- a/pkg/server/healthz.go +++ b/pkg/server/healthz.go @@ -4,7 +4,6 @@ import ( "net/http" ) - func (*Controller) healthz(w http.ResponseWriter, _ *http.Request) { - w.Write([]byte("server is ready")) + _, _ = w.Write([]byte("server is ready")) } diff --git a/pkg/server/ingest.go b/pkg/server/ingest.go index 37f6d327d6..f7eec1ed13 100644 --- a/pkg/server/ingest.go +++ b/pkg/server/ingest.go @@ -1,22 +1,25 @@ package server import ( + "fmt" "io" "net/http" "strconv" "time" + "github.com/sirupsen/logrus" + "github.com/pyroscope-io/pyroscope/pkg/agent/types" "github.com/pyroscope-io/pyroscope/pkg/convert" "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/storage/tree" "github.com/pyroscope-io/pyroscope/pkg/util/attime" - "github.com/sirupsen/logrus" ) type ingestParams struct { parserFunc func(io.Reader) (*tree.Tree, error) - storageKey *storage.Key + storageKey *segment.Key spyName string sampleRate uint32 units string @@ -26,32 +29,53 @@ type ingestParams struct { until time.Time } -func wrapConvertFunction(convertFunc func(r io.Reader, cb func(name []byte, val int)) error) func(io.Reader) (*tree.Tree, error) { - return func(r io.Reader) (*tree.Tree, error) { - t := tree.New() - if err := convertFunc(r, func(k []byte, v int) { - t.Insert(k, uint64(v)) - }); err != nil { - return nil, err - } +func (ctrl *Controller) ingestHandler(w http.ResponseWriter, r *http.Request) { + var ip ingestParams + if err := ctrl.ingestParamsFromRequest(r, &ip); err != nil { + ctrl.writeInvalidParameterError(w, err) + return + } - return t, nil + var t *tree.Tree + t, err := ip.parserFunc(r.Body) + if err != nil { + ctrl.writeError(w, http.StatusUnprocessableEntity, err, "error happened while parsing request body") + return } + + err = ctrl.storage.Put(&storage.PutInput{ + StartTime: ip.from, + EndTime: ip.until, + Key: ip.storageKey, + Val: t, + SpyName: ip.spyName, + SampleRate: ip.sampleRate, + Units: ip.units, + AggregationType: ip.aggregationType, + }) + if err != nil { + ctrl.writeInternalServerError(w, err, "error happened while ingesting data") + return + } + + ctrl.statsInc("ingest") + ctrl.statsInc("ingest:" + ip.spyName) + k := *ip.storageKey + ctrl.appStats.Add(hashString(k.AppName())) } -func ingestParamsFromRequest(r *http.Request) *ingestParams { - ip := &ingestParams{} +func (ctrl *Controller) ingestParamsFromRequest(r *http.Request, ip *ingestParams) error { q := r.URL.Query() - format := q.Get("format") - - if format == "tree" || r.Header.Get("Content-Type") == "binary/octet-stream+tree" { + contentType := r.Header.Get("Content-Type") + switch { + case format == "tree", contentType == "binary/octet-stream+tree": ip.parserFunc = tree.DeserializeNoDict - } else if format == "trie" || r.Header.Get("Content-Type") == "binary/octet-stream+trie" { + case format == "trie", contentType == "binary/octet-stream+trie": ip.parserFunc = wrapConvertFunction(convert.ParseTrie) - } else if format == "lines" { + case format == "lines": ip.parserFunc = wrapConvertFunction(convert.ParseIndividualLines) - } else { + default: ip.parserFunc = wrapConvertFunction(convert.ParseGroups) } @@ -99,44 +123,21 @@ func ingestParamsFromRequest(r *http.Request) *ingestParams { } var err error - ip.storageKey, err = storage.ParseKey(q.Get("name")) + ip.storageKey, err = segment.ParseKey(q.Get("name")) if err != nil { - logrus.Error("parsing error:", err) + return fmt.Errorf("name: %w", err) } - - return ip + return nil } -func (ctrl *Controller) ingestHandler(w http.ResponseWriter, r *http.Request) { - ip := ingestParamsFromRequest(r) - var t *tree.Tree - t, err := ip.parserFunc(r.Body) - if err != nil { - returnError(w, 422, err, "error happened while parsing data") - return - } - - err = ctrl.storage.Put(&storage.PutInput{ - StartTime: ip.from, - EndTime: ip.until, - Key: ip.storageKey, - Val: t, - SpyName: ip.spyName, - SampleRate: ip.sampleRate, - Units: ip.units, - AggregationType: ip.aggregationType, - }) - if err != nil { - returnError(w, 503, err, "error happened while inserting data") - return +func wrapConvertFunction(convertFunc func(r io.Reader, cb func(name []byte, val int)) error) func(io.Reader) (*tree.Tree, error) { + return func(r io.Reader) (*tree.Tree, error) { + t := tree.New() + if err := convertFunc(r, func(k []byte, v int) { + t.Insert(k, uint64(v)) + }); err != nil { + return nil, err + } + return t, nil } - ctrl.statsInc("ingest") - ctrl.statsInc("ingest:" + ip.spyName) - k := *ip.storageKey - ctrl.appStats.Add(hashString(k.AppName())) -} - -func returnError(w http.ResponseWriter, status int, err error, errMessage string) { - logrus.WithField("err", err).Error(errMessage) - w.WriteHeader(status) } diff --git a/pkg/server/ingest_test.go b/pkg/server/ingest_test.go index 7ef2d3909e..9620b41d67 100644 --- a/pkg/server/ingest_test.go +++ b/pkg/server/ingest_test.go @@ -14,6 +14,7 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/config" "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/testing" ) @@ -27,6 +28,7 @@ var _ = Describe("server", func() { var buf *bytes.Buffer var format string var contentType string + var name string // this is an example of Shared Example pattern // see https://onsi.github.io/ginkgo/#shared-example-patterns @@ -39,16 +41,18 @@ var _ = Describe("server", func() { s, err := storage.New(&(*cfg).Server) Expect(err).ToNot(HaveOccurred()) c, _ := New(&(*cfg).Server, s, logrus.New()) - httpServer := httptest.NewServer(c.mux()) + h, _ := c.mux() + httpServer := httptest.NewServer(h) defer s.Close() - name := "test.app{}" - st := testing.ParseTime("2020-01-01-01:01:00") et := testing.ParseTime("2020-01-01-01:01:10") u, _ := url.Parse(httpServer.URL + "/ingest") q := u.Query() + if name == "" { + name = "test.app{}" + } q.Add("name", name) q.Add("from", strconv.Itoa(int(st.Unix()))) q.Add("until", strconv.Itoa(int(et.Unix()))) @@ -70,7 +74,7 @@ var _ = Describe("server", func() { Expect(err).ToNot(HaveOccurred()) Expect(res.StatusCode).To(Equal(200)) - sk, _ := storage.ParseKey(name) + sk, _ := segment.ParseKey(name) gOut, err := s.Get(&storage.GetInput{ StartTime: st, EndTime: et, @@ -144,6 +148,17 @@ var _ = Describe("server", func() { ItCorrectlyParsesIncomingData() }) + + Context("name with tags", func() { + BeforeEach(func() { + buf = bytes.NewBuffer([]byte("foo;bar 2\nfoo;baz 3\n")) + format = "" + contentType = "" + name = "test.app{foo=bar,baz=qux}" + }) + + ItCorrectlyParsesIncomingData() + }) }) }) }) diff --git a/pkg/server/labels.go b/pkg/server/labels.go index be137abfbd..a932526525 100644 --- a/pkg/server/labels.go +++ b/pkg/server/labels.go @@ -6,28 +6,34 @@ import ( ) func (ctrl *Controller) labelsHandler(w http.ResponseWriter, _ *http.Request) { - res := []string{} + var keys []string ctrl.storage.GetKeys(func(k string) bool { - res = append(res, k) + keys = append(keys, k) return true }) - b, err := json.Marshal(res) + b, err := json.Marshal(keys) if err != nil { - panic(err) // TODO: handle + ctrl.writeJSONEncodeError(w, err) + return } - w.Write(b) + _, _ = w.Write(b) } func (ctrl *Controller) labelValuesHandler(w http.ResponseWriter, r *http.Request) { - res := []string{} labelName := r.URL.Query().Get("label") + if labelName == "" { + ctrl.writeInvalidParameterError(w, errLabelIsRequired) + return + } + var values []string ctrl.storage.GetValues(labelName, func(v string) bool { - res = append(res, v) + values = append(values, v) return true }) - b, err := json.Marshal(res) + b, err := json.Marshal(values) if err != nil { - panic(err) // TODO: handle + ctrl.writeJSONEncodeError(w, err) + return } - w.Write(b) + _, _ = w.Write(b) } diff --git a/pkg/server/render.go b/pkg/server/render.go index a7fffc0ab5..561078782c 100644 --- a/pkg/server/render.go +++ b/pkg/server/render.go @@ -2,76 +2,107 @@ package server import ( "encoding/json" + "errors" + "fmt" "net/http" "strconv" - "time" + "github.com/pyroscope-io/pyroscope/pkg/flameql" "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/storage/tree" "github.com/pyroscope-io/pyroscope/pkg/util/attime" ) -type samplesEntry struct { - Ts time.Time `json:"ts"` - Samples uint16 `json:"samples"` +var ( + errUnknownFormat = errors.New("unknown format") + errLabelIsRequired = errors.New("label parameter is required") +) + +type renderParams struct { + format string + maxNodes int + gi *storage.GetInput } func (ctrl *Controller) renderHandler(w http.ResponseWriter, r *http.Request) { - q := r.URL.Query() - startTime := attime.Parse(q.Get("from")) - endTime := attime.Parse(q.Get("until")) - var err error - storageKey, err := storage.ParseKey(q.Get("name")) - if err != nil { - panic(err) // TODO: handle + var p renderParams + if err := ctrl.renderParametersFromRequest(r, &p); err != nil { + ctrl.writeInvalidParameterError(w, err) + return + } + + switch p.format { + case "json", "": + default: + ctrl.writeInvalidParameterError(w, errUnknownFormat) + return } - gOut, err := ctrl.storage.Get(&storage.GetInput{ - StartTime: startTime, - EndTime: endTime, - Key: storageKey, - }) + out, err := ctrl.storage.Get(p.gi) ctrl.statsInc("render") if err != nil { - panic(err) // TODO: handle + ctrl.writeInternalServerError(w, err, "failed to retrieve data") + return } // TODO: handle properly - if gOut == nil { - gOut = &storage.GetOutput{ - Tree: tree.New(), - } + if out == nil { + out = &storage.GetOutput{Tree: tree.New()} } - maxNodes := ctrl.config.MaxNodesRender - if mn, err := strconv.Atoi(q.Get("max-nodes")); err == nil && mn > 0 { - maxNodes = mn + w.Header().Set("Content-Type", "application/json") + fs := out.Tree.FlamebearerStruct(p.maxNodes) + // TODO remove this duplication? We're already adding this to metadata + fs.SpyName = out.SpyName + fs.SampleRate = out.SampleRate + fs.Units = out.Units + res := map[string]interface{}{ + "timeline": out.Timeline, + "flamebearer": fs, + "metadata": map[string]interface{}{ + "spyName": out.SpyName, + "sampleRate": out.SampleRate, + "units": out.Units, + }, } - switch q.Get("format") { - case "json": - w.Header().Set("Content-Type", "application/json") + if err = json.NewEncoder(w).Encode(res); err != nil { + ctrl.writeJSONEncodeError(w, err) + } +} + +func (ctrl *Controller) renderParametersFromRequest(r *http.Request, p *renderParams) error { + v := r.URL.Query() + p.gi = new(storage.GetInput) - fs := gOut.Tree.FlamebearerStruct(maxNodes) - // TODO remove this duplication? We're already adding this to metadata - fs.SpyName = gOut.SpyName - fs.SampleRate = gOut.SampleRate - fs.Units = gOut.Units - res := map[string]interface{}{ - "timeline": gOut.Timeline, - "flamebearer": fs, - "metadata": map[string]interface{}{ - "spyName": gOut.SpyName, - "sampleRate": gOut.SampleRate, - "units": gOut.Units, - }, + k := v.Get("name") + q := v.Get("query") + + switch { + case k == "" && q == "": + return fmt.Errorf("'query' or 'name' parameter is required") + case k != "": + sk, err := segment.ParseKey(k) + if err != nil { + return fmt.Errorf("name: parsing storage key: %w", err) + } + p.gi.Key = sk + case q != "": + qry, err := flameql.ParseQuery(q) + if err != nil { + return fmt.Errorf("query: %w", err) } + p.gi.Query = qry + } - encoder := json.NewEncoder(w) - encoder.Encode(res) - return - default: - // TODO: add handling for other cases - w.WriteHeader(422) + p.maxNodes = ctrl.config.MaxNodesRender + if mn, err := strconv.Atoi(v.Get("max-nodes")); err == nil && mn > 0 { + p.maxNodes = mn } + + p.gi.StartTime = attime.Parse(v.Get("from")) + p.gi.EndTime = attime.Parse(v.Get("until")) + p.format = v.Get("format") + return nil } diff --git a/pkg/server/render_test.go b/pkg/server/render_test.go new file mode 100644 index 0000000000..95b1e78150 --- /dev/null +++ b/pkg/server/render_test.go @@ -0,0 +1,49 @@ +package server + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + + "github.com/pyroscope-io/pyroscope/pkg/config" + "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/testing" +) + +var _ = Describe("server", func() { + testing.WithConfig(func(cfg **config.Config) { + Context("/render", func() { + It("supports name and query parameters", func() { + var httpServer *httptest.Server + (*cfg).Server.APIBindAddr = ":10044" + s, err := storage.New(&(*cfg).Server) + Expect(err).ToNot(HaveOccurred()) + c, _ := New(&(*cfg).Server, s, logrus.New()) + h, _ := c.mux() + httpServer = httptest.NewServer(h) + defer httpServer.Close() + + resp, err := http.Get(fmt.Sprintf("%s/render?name=%s", httpServer.URL, url.QueryEscape(`app`))) + Expect(err).ToNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + resp, err = http.Get(fmt.Sprintf("%s/render?query=%s", httpServer.URL, url.QueryEscape(`app{foo="bar"}`))) + Expect(err).ToNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + resp, err = http.Get(fmt.Sprintf("%s/render?query=%s", httpServer.URL, url.QueryEscape(`app{foo"bar"}`))) + Expect(err).ToNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusBadRequest)) + + resp, err = http.Get(fmt.Sprintf("%s/render", httpServer.URL)) + Expect(err).ToNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusBadRequest)) + }) + }) + }) +}) diff --git a/pkg/storage/cache/cache.go b/pkg/storage/cache/cache.go index 8a2093ecef..f0fafc3ee2 100644 --- a/pkg/storage/cache/cache.go +++ b/pkg/storage/cache/cache.go @@ -5,11 +5,10 @@ import ( "fmt" "sync" + "github.com/dgraph-io/badger/v2" "github.com/dgrijalva/lfu-go" - "github.com/pyroscope-io/pyroscope/pkg/util/metrics" - "github.com/sirupsen/logrus" - "github.com/dgraph-io/badger/v2" + "github.com/pyroscope-io/pyroscope/pkg/util/metrics" ) type Cache struct { @@ -147,14 +146,38 @@ func (cache *Cache) Delete(key string) error { return err } -func (cache *Cache) Get(key string) (interface{}, error) { +func (cache *Cache) GetOrCreate(key string) (interface{}, error) { + v, err := cache.lookup(key) // find the key from cache first + if err != nil { + return nil, err + } + if v != nil { + return v, nil + } + if cache.New == nil { + return nil, errors.New("cache's New function is nil") + } + v = cache.New(key) + cache.lfu.Set(key, v) + return v, nil +} + +func (cache *Cache) Lookup(key string) (interface{}, bool) { + v, err := cache.lookup(key) + if v == nil || err != nil { + return nil, false + } + return v, true +} + +func (cache *Cache) lookup(key string) (interface{}, error) { // find the key from cache first val := cache.lfu.Get(key) if val != nil { metrics.Count(cache.hitCounter, 1) return val, nil } - logrus.WithField("key", key).Debug("lfu miss") + // logrus.WithField("key", key).Debug("lfu miss") metrics.Count(cache.missCounter, 1) var copied []byte @@ -182,15 +205,8 @@ func (cache *Cache) Get(key string) (interface{}, error) { // if it's not found from badger, create a new object if copied == nil { - logrus.WithField("key", key).Debug("storage miss") - - if cache.New == nil { - return nil, errors.New("cache's New function is nil") - } - - newVal := cache.New(key) - cache.lfu.Set(key, newVal) - return newVal, nil + // logrus.WithField("key", key).Debug("storage miss") + return nil, nil } // deserialize the object from storage @@ -205,7 +221,7 @@ func (cache *Cache) Get(key string) (interface{}, error) { cache.saveToDisk(key, val) } - logrus.WithField("key", key).Debug("storage hit") + // logrus.WithField("key", key).Debug("storage hit") return val, nil } diff --git a/pkg/storage/cache/cache_test.go b/pkg/storage/cache/cache_test.go index 5dd0a00ee9..0cfed83fb9 100644 --- a/pkg/storage/cache/cache_test.go +++ b/pkg/storage/cache/cache_test.go @@ -46,15 +46,15 @@ var _ = Describe("cache", func() { } log.Printf("size: %d", cache.Len()) - v, err := cache.Get("foo-199") + v, err := cache.GetOrCreate("foo-199") Expect(err).ToNot(HaveOccurred()) Expect(v).To(Equal("bar-199")) - v, err = cache.Get("foo-1") + v, err = cache.GetOrCreate("foo-1") Expect(err).ToNot(HaveOccurred()) Expect(v).To(Equal("bar-1")) - v, err = cache.Get("foo-1234") + v, err = cache.GetOrCreate("foo-1234") Expect(err).ToNot(HaveOccurred()) Expect(v).To(Equal("foo-1234")) cache.Flush() diff --git a/pkg/storage/dimension/dimension.go b/pkg/storage/dimension/dimension.go index 2248b4b056..3ee81c42c2 100644 --- a/pkg/storage/dimension/dimension.go +++ b/pkg/storage/dimension/dimension.go @@ -11,12 +11,12 @@ type Key []byte type Dimension struct { m sync.RWMutex // keys are sorted - keys []Key + Keys []Key } func New() *Dimension { return &Dimension{ - keys: []Key{}, + Keys: []Key{}, } } @@ -24,18 +24,18 @@ func (d *Dimension) Insert(key Key) { d.m.Lock() defer d.m.Unlock() - i := sort.Search(len(d.keys), func(i int) bool { - return bytes.Compare(d.keys[i], key) >= 0 + i := sort.Search(len(d.Keys), func(i int) bool { + return bytes.Compare(d.Keys[i], key) >= 0 }) - if i < len(d.keys) && bytes.Compare(d.keys[i], key) == 0 { + if i < len(d.Keys) && bytes.Compare(d.Keys[i], key) == 0 { return } - if i > len(d.keys)-1 || !bytes.Equal(d.keys[i], key) { - d.keys = append(d.keys, key) - copy(d.keys[i+1:], d.keys[i:]) - d.keys[i] = key + if i > len(d.Keys)-1 || !bytes.Equal(d.Keys[i], key) { + d.Keys = append(d.Keys, key) + copy(d.Keys[i+1:], d.Keys[i:]) + d.Keys[i] = key } } @@ -43,12 +43,12 @@ func (d *Dimension) Delete(key Key) { d.m.Lock() defer d.m.Unlock() - i := sort.Search(len(d.keys), func(i int) bool { - return bytes.Compare(d.keys[i], key) >= 0 + i := sort.Search(len(d.Keys), func(i int) bool { + return bytes.Compare(d.Keys[i], key) >= 0 }) - if i < len(d.keys) && bytes.Compare(d.keys[i], key) == 0 { - d.keys = append(d.keys[:i], d.keys[i+1:]...) + if i < len(d.Keys) && bytes.Compare(d.Keys[i], key) == 0 { + d.Keys = append(d.Keys[:i], d.Keys[i+1:]...) return } } @@ -106,7 +106,7 @@ func Intersection(input ...*Dimension) []Key { if len(input) == 0 { return []Key{} } else if len(input) == 1 { - return input[0].keys + return input[0].Keys } result := []Key{} @@ -114,17 +114,13 @@ func Intersection(input ...*Dimension) []Key { dims := []*sortableDim{} for _, v := range input { - if len(v.keys) == 0 { + if len(v.Keys) == 0 { return []Key{} } - // kinda ugly imo - v.m.RLock() - defer v.m.RUnlock() - dims = append(dims, &sortableDim{ - keys: v.keys, + keys: v.Keys, i: 0, - l: len(v.keys), + l: len(v.Keys), }) } @@ -165,7 +161,7 @@ func Union(input ...*Dimension) []Key { if len(input) == 0 { return []Key{} } else if len(input) == 1 { - return input[0].keys + return input[0].Keys } result := []Key{} @@ -173,7 +169,7 @@ func Union(input ...*Dimension) []Key { isExists := map[string]bool{} for _, v := range input { - for _, k := range v.keys { + for _, k := range v.Keys { if !isExists[string(k)] { result = append(result, k) } @@ -184,3 +180,35 @@ func Union(input ...*Dimension) []Key { return result } + +// TODO: rework +func AndNot(a, b *Dimension) []Key { + a.m.RLock() + defer a.m.RUnlock() + if len(a.Keys) == 0 { + return nil + } + + b.m.RLock() + defer b.m.RUnlock() + if len(b.Keys) == 0 { + r := make([]Key, len(a.Keys)) + copy(r, a.Keys) + return r + } + + r := make([]Key, 0, len(a.Keys)) + m := make(map[string]struct{}, len(b.Keys)) + + for _, k := range b.Keys { + m[string(k)] = struct{}{} + } + + for _, k := range a.Keys { + if _, ok := m[string(k)]; !ok { + r = append(r, k) + } + } + + return r +} diff --git a/pkg/storage/dimension/serialization.go b/pkg/storage/dimension/serialization.go index 282254f995..bdb6d7a8ae 100644 --- a/pkg/storage/dimension/serialization.go +++ b/pkg/storage/dimension/serialization.go @@ -14,7 +14,7 @@ const currentVersion = 1 func (s *Dimension) Serialize(w io.Writer) error { varint.Write(w, currentVersion) - for _, k := range s.keys { + for _, k := range s.Keys { varint.Write(w, uint64(len(k))) w.Write([]byte(k)) } @@ -46,7 +46,7 @@ func Deserialize(r io.Reader) (*Dimension, error) { return nil, err } - s.keys = append(s.keys, Key(keyBuf)) + s.Keys = append(s.Keys, Key(keyBuf)) } return s, nil diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 747a4a660f..5fef363ec6 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -8,9 +8,11 @@ import ( "os" "path/filepath" + "github.com/sirupsen/logrus" + + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/storage/tree" "github.com/pyroscope-io/pyroscope/pkg/util/varint" - "github.com/sirupsen/logrus" ) func (s *Storage) collectLocalProfile(path string) error { @@ -58,7 +60,7 @@ func (s *Storage) collectLocalProfile(path string) error { return err } - pi.Key, err = ParseKey(string(nameBuf)) + pi.Key, err = segment.ParseKey(string(nameBuf)) if err != nil { return err } diff --git a/pkg/storage/periodic.go b/pkg/storage/periodic.go index fb5eaf8eea..935307a34b 100644 --- a/pkg/storage/periodic.go +++ b/pkg/storage/periodic.go @@ -60,7 +60,6 @@ func (s *Storage) evictionTask(memTotal uint64) func() { s.dicts.Evict(percent / 4) s.segments.Evict(percent / 2) s.trees.Evict(percent) - runtime.GC() }) } } @@ -73,7 +72,6 @@ func (s *Storage) writeBackTask() { s.segments.WriteBack() s.dicts.WriteBack() s.trees.WriteBack() - runtime.GC() }) } diff --git a/pkg/storage/query.go b/pkg/storage/query.go new file mode 100644 index 0000000000..c1453f9cb1 --- /dev/null +++ b/pkg/storage/query.go @@ -0,0 +1,87 @@ +package storage + +import ( + "context" + + "github.com/pyroscope-io/pyroscope/pkg/flameql" + "github.com/pyroscope-io/pyroscope/pkg/storage/dimension" +) + +func (s *Storage) exec(_ context.Context, qry *flameql.Query) []dimension.Key { + app, found := s.lookupAppDimension(qry.AppName) + if !found { + return nil + } + if len(qry.Matchers) == 0 { + return app.Keys + } + + r := []*dimension.Dimension{app} + var n []*dimension.Dimension // Keys to be removed from the result. + + for _, m := range qry.Matchers { + switch m.Op { + case flameql.EQL: + if d, ok := s.lookupDimension(m); ok { + r = append(r, d) + } else { + return nil + } + case flameql.NEQ: + if d, ok := s.lookupDimension(m); ok { + n = append(n, d) + } + case flameql.EQL_REGEX: + if d, ok := s.lookupDimensionRegex(m); ok { + r = append(r, d) + } else { + return nil + } + case flameql.NEQ_REGEX: + if d, ok := s.lookupDimensionRegex(m); ok { + n = append(n, d) + } + } + } + + i := dimension.Intersection(r...) + if len(n) == 0 { + return i + } + + return dimension.AndNot( + &dimension.Dimension{Keys: i}, + &dimension.Dimension{Keys: dimension.Union(n...)}) +} + +func (s *Storage) lookupAppDimension(app string) (*dimension.Dimension, bool) { + return s.lookupDimensionKV("__name__", app) +} + +func (s *Storage) lookupDimension(m *flameql.TagMatcher) (*dimension.Dimension, bool) { + return s.lookupDimensionKV(m.Key, m.Value) +} + +func (s *Storage) lookupDimensionRegex(m *flameql.TagMatcher) (*dimension.Dimension, bool) { + d := dimension.New() + s.labels.GetValues(m.Key, func(v string) bool { + if m.R.MatchString(v) { + if x, ok := s.lookupDimensionKV(m.Key, v); ok { + d.Keys = append(d.Keys, x.Keys...) + } + } + return true + }) + if len(d.Keys) > 0 { + return d, true + } + return nil, false +} + +func (s *Storage) lookupDimensionKV(k, v string) (*dimension.Dimension, bool) { + r, ok := s.dimensions.Lookup(k + ":" + v) + if ok { + return r.(*dimension.Dimension), true + } + return nil, false +} diff --git a/pkg/storage/query_test.go b/pkg/storage/query_test.go new file mode 100644 index 0000000000..a4cef80cd0 --- /dev/null +++ b/pkg/storage/query_test.go @@ -0,0 +1,177 @@ +package storage + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/pyroscope-io/pyroscope/pkg/config" + "github.com/pyroscope-io/pyroscope/pkg/flameql" + "github.com/pyroscope-io/pyroscope/pkg/storage/dimension" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" + "github.com/pyroscope-io/pyroscope/pkg/storage/tree" + "github.com/pyroscope-io/pyroscope/pkg/testing" +) + +var _ = Describe("Querying", func() { + testing.WithConfig(func(cfg **config.Config) { + JustBeforeEach(func() { + var err error + s, err = New(&(*cfg).Server) + Expect(err).ToNot(HaveOccurred()) + keys := []string{ + "app.name{foo=bar,baz=qux}", + "app.name{foo=bar,baz=xxx}", + "app.name{waldo=fred,baz=xxx}", + } + for _, k := range keys { + t := tree.New() + t.Insert([]byte("a;b"), uint64(1)) + t.Insert([]byte("a;c"), uint64(2)) + st := testing.SimpleTime(10) + et := testing.SimpleTime(19) + key, err := segment.ParseKey(k) + Expect(err).ToNot(HaveOccurred()) + err = s.Put(&PutInput{ + StartTime: st, + EndTime: et, + Key: key, + Val: t, + SpyName: "testspy", + SampleRate: 100, + }) + Expect(err).ToNot(HaveOccurred()) + } + }) + + Context("basic queries", func() { + It("get returns result with query", func() { + qry, err := flameql.ParseQuery(`app.name{foo="bar"}`) + Expect(err).ToNot(HaveOccurred()) + output, err := s.Get(&GetInput{ + StartTime: time.Time{}, + EndTime: maxTime, + Query: qry, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output).ToNot(BeNil()) + Expect(output.Tree).ToNot(BeNil()) + Expect(output.Tree.Samples()).To(Equal(uint64(6))) + }) + + It("get returns a particular tree for a fully qualified key", func() { + k, err := segment.ParseKey(`app.name{foo=bar,baz=qux}`) + Expect(err).ToNot(HaveOccurred()) + output, err := s.Get(&GetInput{ + StartTime: time.Time{}, + EndTime: maxTime, + Key: k, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output).ToNot(BeNil()) + Expect(output.Tree).ToNot(BeNil()) + Expect(output.Tree.Samples()).To(Equal(uint64(3))) + }) + + It("get returns all results for a key containing only app name", func() { + k, err := segment.ParseKey(`app.name`) + Expect(err).ToNot(HaveOccurred()) + output, err := s.Get(&GetInput{ + StartTime: time.Time{}, + EndTime: maxTime, + Key: k, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(output).ToNot(BeNil()) + Expect(output.Tree).ToNot(BeNil()) + Expect(output.Tree.Samples()).To(Equal(uint64(9))) + }) + + It("query returns expected results", func() { + type testCase struct { + query string + segmentKeys []dimension.Key + } + + testCases := []testCase{ + {`app.name`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + {`app.name{foo="bar"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + }}, + {`app.name{foo=~"^b.*"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + }}, + {`app.name{baz=~"xxx|qux"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + {`app.name{baz!="xxx"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + }}, + {`app.name{foo!="bar"}`, []dimension.Key{ + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + {`app.name{foo!~".*"}`, []dimension.Key{ + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + {`app.name{baz!~"^x.*"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + }}, + {`app.name{foo="bar",baz!~"^x.*"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + }}, + + {`app.name{foo=~"b.*",foo!~".*r"}`, nil}, + + {`app.name{foo!="non-existing-value"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + {`app.name{foo!~"non-existing-.*"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + {`app.name{non-existing-key!="bar"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + {`app.name{non-existing-key!~"bar"}`, []dimension.Key{ + dimension.Key("app.name{baz=qux,foo=bar}"), + dimension.Key("app.name{baz=xxx,foo=bar}"), + dimension.Key("app.name{baz=xxx,waldo=fred}"), + }}, + + {`app.name{foo="non-existing-value"}`, nil}, + {`app.name{foo=~"non-existing-.*"}`, nil}, + {`app.name{non-existing-key="bar"}`, nil}, + {`app.name{non-existing-key=~"bar"}`, nil}, + + {`non-existing-app{}`, nil}, + } + + for _, tc := range testCases { + qry, err := flameql.ParseQuery(tc.query) + Expect(err).ToNot(HaveOccurred()) + r := s.exec(context.TODO(), qry) + if tc.segmentKeys == nil { + Expect(r).To(BeEmpty()) + continue + } + Expect(r).To(Equal(tc.segmentKeys)) + } + }) + }) + }) +}) diff --git a/pkg/storage/key.go b/pkg/storage/segment/key.go similarity index 88% rename from pkg/storage/key.go rename to pkg/storage/segment/key.go index 08b74960d1..eaf6d7965f 100644 --- a/pkg/storage/key.go +++ b/pkg/storage/segment/key.go @@ -1,14 +1,12 @@ -package storage +package segment import ( - "encoding/binary" "regexp" "strconv" "strings" "time" "github.com/pyroscope-io/pyroscope/pkg/structs/sortedmap" - "github.com/twmb/murmur3" ) type Key struct { @@ -28,6 +26,8 @@ const ( doneParserState ) +func NewKey(labels map[string]string) *Key { return &Key{labels: labels} } + // TODO: should rewrite this at some point to not rely on regular expressions & splits func ParseKey(name string) (*Key, error) { k := &Key{ @@ -99,12 +99,16 @@ func (k *Key) SegmentKey() string { return k.Normalized() } +func segmentKeyToTreeKey(k string, depth int, t time.Time) string { + return k + ":" + strconv.Itoa(depth) + ":" + strconv.Itoa(int(t.Unix())) +} + func (k *Key) TreeKey(depth int, t time.Time) string { - return k.Normalized() + ":" + strconv.Itoa(depth) + ":" + strconv.Itoa(int(t.Unix())) + return segmentKeyToTreeKey(k.Normalized(), depth, t) } func (k *Key) DictKey() string { - return k.Normalized() + return k.labels["__name__"] } // FromTreeToDictKey returns app name from tree key k: given tree key @@ -149,15 +153,10 @@ func (k *Key) Normalized() string { return sb.String() } -func (k *Key) Hashed() []byte { - u1, u2 := murmur3.SeedSum128(seed, seed, []byte(k.Normalized())) - - b := make([]byte, 16) - binary.LittleEndian.PutUint64(b[:8], u1) - binary.LittleEndian.PutUint64(b[8:16], u2) - return b -} - func (k *Key) AppName() string { return k.labels["__name__"] } + +func (k *Key) Labels() map[string]string { + return k.labels +} diff --git a/pkg/storage/key_test.go b/pkg/storage/segment/key_test.go similarity index 95% rename from pkg/storage/key_test.go rename to pkg/storage/segment/key_test.go index b1623b05c1..2a4f762749 100644 --- a/pkg/storage/key_test.go +++ b/pkg/storage/segment/key_test.go @@ -1,11 +1,11 @@ -package storage +package segment import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) -var _ = Describe("storage package", func() { +var _ = Describe("segment key", func() { Context("ParseKey", func() { It("no tags version works", func() { k, err := ParseKey("foo") diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 23a86722ec..7d0ee01ce4 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,6 +1,7 @@ package storage import ( + "context" "errors" "fmt" "math/big" @@ -15,6 +16,7 @@ import ( "github.com/sirupsen/logrus" "github.com/pyroscope-io/pyroscope/pkg/config" + "github.com/pyroscope-io/pyroscope/pkg/flameql" "github.com/pyroscope-io/pyroscope/pkg/storage/cache" "github.com/pyroscope-io/pyroscope/pkg/storage/dict" "github.com/pyroscope-io/pyroscope/pkg/storage/dimension" @@ -180,7 +182,7 @@ func New(c *config.Server) (*Storage, error) { type PutInput struct { StartTime time.Time EndTime time.Time - Key *Key + Key *segment.Key Val *tree.Tree SpyName string SampleRate uint32 @@ -189,12 +191,9 @@ type PutInput struct { } func (s *Storage) treeFromBytes(k string, v []byte) (interface{}, error) { - key := FromTreeToDictKey(k) - d, err := s.dicts.Get(key) - if err != nil { - return nil, fmt.Errorf("dicts cache for %v: %v", key, err) - } - if d == nil { + key := segment.FromTreeToDictKey(k) + d, ok := s.dicts.Lookup(key) + if !ok { // The key not found. Fallback to segment key form which has been // used before tags support. Refer to FromTreeToDictKey. return s.treeFromBytesFallback(k, v) @@ -203,27 +202,26 @@ func (s *Storage) treeFromBytes(k string, v []byte) (interface{}, error) { } func (s *Storage) treeFromBytesFallback(k string, v []byte) (interface{}, error) { - key := FromTreeToMainKey(k) - d, err := s.dicts.Get(key) - if err != nil { - return nil, fmt.Errorf("dicts cache for %v: %v", key, err) - } - if d == nil { // key not found + key := segment.FromTreeToMainKey(k) + d, ok := s.dicts.Lookup(key) + if !ok { return nil, nil } return tree.FromBytes(d.(*dict.Dict), v) } func (s *Storage) treeBytes(k string, v interface{}) ([]byte, error) { - key := FromTreeToDictKey(k) - d, err := s.dicts.Get(key) + key := segment.FromTreeToDictKey(k) + d, err := s.dicts.GetOrCreate(key) if err != nil { return nil, fmt.Errorf("dicts cache for %v: %v", key, err) } - if d == nil { // key not found - return nil, nil + b, err := v.(*tree.Tree).Bytes(d.(*dict.Dict), s.config.MaxNodesSerialization) + if err != nil { + return nil, fmt.Errorf("dicts cache for %v: %v", key, err) } - return v.(*tree.Tree).Bytes(d.(*dict.Dict), s.config.MaxNodesSerialization) + s.dicts.Put(key, d) + return b, nil } var OutOfSpaceThreshold = 512 * bytesize.MB @@ -250,74 +248,63 @@ func (s *Storage) Put(po *PutInput) error { "aggregationType": po.AggregationType, }).Debug("storage.Put") - for k, v := range po.Key.labels { + for k, v := range po.Key.Labels() { s.labels.Put(k, v) } sk := po.Key.SegmentKey() - for k, v := range po.Key.labels { + for k, v := range po.Key.Labels() { key := k + ":" + v - res, err := s.dimensions.Get(key) + r, err := s.dimensions.GetOrCreate(key) if err != nil { logrus.Errorf("dimensions cache for %v: %v", key, err) continue } - if res != nil { - res.(*dimension.Dimension).Insert([]byte(sk)) - } + r.(*dimension.Dimension).Insert([]byte(sk)) + s.dimensions.Put(key, r) } - res, err := s.segments.Get(sk) + r, err := s.segments.GetOrCreate(sk) if err != nil { return fmt.Errorf("segments cache for %v: %v", sk, err) } - if res == nil { - return fmt.Errorf("segments cache for %v: not found", sk) - } - st := res.(*segment.Segment) + st := r.(*segment.Segment) st.SetMetadata(po.SpyName, po.SampleRate, po.Units, po.AggregationType) samples := po.Val.Samples() + st.Put(po.StartTime, po.EndTime, samples, func(depth int, t time.Time, r *big.Rat, addons []segment.Addon) { tk := po.Key.TreeKey(depth, t) - - res, err := s.trees.Get(tk) + res, err := s.trees.GetOrCreate(tk) if err != nil { logrus.Errorf("trees cache for %v: %v", tk, err) return } cachedTree := res.(*tree.Tree) - treeClone := po.Val.Clone(r) for _, addon := range addons { - tk2 := po.Key.TreeKey(addon.Depth, addon.T) - - res, err := s.trees.Get(tk2) - if err != nil { - logrus.Errorf("trees cache for %v: %v", tk, err) - continue - } - if res == nil { - continue + if res, ok := s.trees.Lookup(po.Key.TreeKey(addon.Depth, addon.T)); ok { + ta := res.(*tree.Tree) + ta.RLock() + treeClone.Merge(ta) + ta.RUnlock() } - treeClone.Merge(res.(*tree.Tree)) - } - if cachedTree != nil { - cachedTree.Merge(treeClone) - s.trees.Put(tk, cachedTree) - } else { - s.trees.Put(tk, treeClone) } + cachedTree.Lock() + cachedTree.Merge(treeClone) + cachedTree.Unlock() + s.trees.Put(tk, cachedTree) }) - s.segments.Put(sk, st) + s.segments.Put(sk, st) return nil } type GetInput struct { StartTime time.Time EndTime time.Time - Key *Key + Key *segment.Key + Query *flameql.Query } type GetOutput struct { @@ -329,47 +316,44 @@ type GetOutput struct { } func (s *Storage) Get(gi *GetInput) (*GetOutput, error) { - logrus.WithFields(logrus.Fields{ + logger := logrus.WithFields(logrus.Fields{ "startTime": gi.StartTime.String(), "endTime": gi.EndTime.String(), - "key": gi.Key.Normalized(), - }).Trace("storage.Get") - triesToMerge := []merge.Merger{} - - dimensions := []*dimension.Dimension{} - for k, v := range gi.Key.labels { - key := k + ":" + v - res, err := s.dimensions.Get(key) - if err != nil { - logrus.Errorf("dimensions cache for %v: %v", key, err) - continue - } - if res != nil { - dimensions = append(dimensions, res.(*dimension.Dimension)) - } - } - - segmentKeys := dimension.Intersection(dimensions...) + }) - tl := segment.GenerateTimeline(gi.StartTime, gi.EndTime) - var lastSegment *segment.Segment - var writesTotal uint64 - aggregationType := "sum" - for _, sk := range segmentKeys { + var dimensionKeys func() []dimension.Key + switch { + case gi.Key != nil: + logger = logger.WithField("key", gi.Key.Normalized()) + dimensionKeys = s.dimensionKeysByKey(gi.Key) + case gi.Query != nil: + logger = logger.WithField("query", gi.Query) + dimensionKeys = s.dimensionKeysByQuery(gi.Query) + default: + // Should never happen. + return nil, fmt.Errorf("key or query must be specified") + } + + logger.Debug("storage.Get") + var ( + triesToMerge []merge.Merger + lastSegment *segment.Segment + writesTotal uint64 + + timeline = segment.GenerateTimeline(gi.StartTime, gi.EndTime) + aggregationType = "sum" + ) + + for _, k := range dimensionKeys() { // TODO: refactor, store `Key`s in dimensions - parsedKey, err := ParseKey(string(sk)) + parsedKey, err := segment.ParseKey(string(k)) if err != nil { - logrus.Errorf("parse key: %v: %v", string(sk), err) + logrus.Errorf("parse key: %v: %v", string(k), err) continue } - key := parsedKey.SegmentKey() - res, err := s.segments.Get(key) - if err != nil { - logrus.Errorf("segments cache for %v: %v", key, err) - continue - } - if res == nil { + res, ok := s.segments.Lookup(key) + if !ok { continue } @@ -377,72 +361,71 @@ func (s *Storage) Get(gi *GetInput) (*GetOutput, error) { if st.AggregationType() == "average" { aggregationType = "average" } - lastSegment = st - tl.PopulateTimeline(st) + timeline.PopulateTimeline(st) + lastSegment = st st.Get(gi.StartTime, gi.EndTime, func(depth int, samples, writes uint64, t time.Time, r *big.Rat) { - key := parsedKey.TreeKey(depth, t) - res, err := s.trees.Get(key) - if err != nil { - logrus.Errorf("trees cache for %v: %v", key, err) - return + if res, ok = s.trees.Lookup(parsedKey.TreeKey(depth, t)); ok { + triesToMerge = append(triesToMerge, res.(*tree.Tree).Clone(r)) + writesTotal += writes } - - tr := res.(*tree.Tree) - // TODO: these clones are probably are not the most efficient way of doing this - // instead this info should be passed to the merger function imo - tr2 := tr.Clone(r) - triesToMerge = append(triesToMerge, merge.Merger(tr2)) - writesTotal += writes }) } - resultTrie := merge.MergeTriesConcurrently(runtime.NumCPU(), triesToMerge...) + resultTrie := merge.MergeTriesSerially(runtime.NumCPU(), triesToMerge...) if resultTrie == nil { return nil, nil } t := resultTrie.(*tree.Tree) - if writesTotal > 0 && aggregationType == "average" { t = t.Clone(big.NewRat(1, int64(writesTotal))) } return &GetOutput{ Tree: t, - Timeline: tl, + Timeline: timeline, SpyName: lastSegment.SpyName(), SampleRate: lastSegment.SampleRate(), Units: lastSegment.Units(), }, nil } -func (s *Storage) iterateOverAllSegments(cb func(*Key, *segment.Segment) error) error { +func (s *Storage) dimensionKeysByKey(key *segment.Key) func() []dimension.Key { + return func() []dimension.Key { + var dimensions []*dimension.Dimension + for k, v := range key.Labels() { + if d, ok := s.lookupDimensionKV(k, v); ok { + dimensions = append(dimensions, d) + } + } + return dimension.Intersection(dimensions...) + } +} + +func (s *Storage) dimensionKeysByQuery(qry *flameql.Query) func() []dimension.Key { + return func() []dimension.Key { return s.exec(context.TODO(), qry) } +} + +func (s *Storage) iterateOverAllSegments(cb func(*segment.Key, *segment.Segment) error) error { nameKey := "__name__" var dimensions []*dimension.Dimension - var err error s.labels.GetValues(nameKey, func(v string) bool { - dmInt, getErr := s.dimensions.Get(nameKey + ":" + v) - dm, _ := dmInt.(*dimension.Dimension) - err = getErr - dimensions = append(dimensions, dm) - return err == nil + dmInt, ok := s.dimensions.Lookup(nameKey + ":" + v) + if !ok { + return true + } + dimensions = append(dimensions, dmInt.(*dimension.Dimension)) + return true }) - if err != nil { - return err - } - - segmentKeys := dimension.Union(dimensions...) - - for _, rawSk := range segmentKeys { - sk, _ := ParseKey(string(rawSk)) - - stInt, err := s.segments.Get(sk.SegmentKey()) - if err != nil { - return err + for _, rawSk := range dimension.Union(dimensions...) { + sk, _ := segment.ParseKey(string(rawSk)) + stInt, ok := s.segments.Lookup(sk.SegmentKey()) + if !ok { + continue } st := stInt.(*segment.Segment) if err := cb(sk, st); err != nil { @@ -453,7 +436,7 @@ func (s *Storage) iterateOverAllSegments(cb func(*Key, *segment.Segment) error) } func (s *Storage) DeleteDataBefore(threshold time.Time) error { - return s.iterateOverAllSegments(func(sk *Key, st *segment.Segment) error { + return s.iterateOverAllSegments(func(sk *segment.Key, st *segment.Segment) error { var err error deletedRoot := st.DeleteDataBefore(threshold, func(depth int, t time.Time) { tk := sk.TreeKey(depth, t) @@ -473,35 +456,29 @@ func (s *Storage) DeleteDataBefore(threshold time.Time) error { } type DeleteInput struct { - Key *Key + Key *segment.Key } var maxTime = time.Unix(1<<62, 999999999) func (s *Storage) Delete(di *DeleteInput) error { - dimensions := []*dimension.Dimension{} - for k, v := range di.Key.labels { - dInt, err := s.dimensions.Get(k + ":" + v) - if err != nil { + var dimensions []*dimension.Dimension + for k, v := range di.Key.Labels() { + dInt, ok := s.dimensions.Lookup(k + ":" + v) + if !ok { return nil } - d := dInt.(*dimension.Dimension) - dimensions = append(dimensions, d) + dimensions = append(dimensions, dInt.(*dimension.Dimension)) } - segmentKeys := dimension.Intersection(dimensions...) - - for _, sk := range segmentKeys { - skk, _ := ParseKey(string(sk)) - stInt, err := s.segments.Get(skk.SegmentKey()) - if err != nil { - return nil - } - st := stInt.(*segment.Segment) - if st == nil { + for _, sk := range dimension.Intersection(dimensions...) { + skk, _ := segment.ParseKey(string(sk)) + stInt, ok := s.segments.Lookup(skk.SegmentKey()) + if !ok { continue } - + st := stInt.(*segment.Segment) + var err error st.Get(zeroTime, maxTime, func(depth int, _, _ uint64, t time.Time, _ *big.Rat) { treeKey := skk.TreeKey(depth, t) err = s.trees.Delete(treeKey) @@ -516,14 +493,13 @@ func (s *Storage) Delete(di *DeleteInput) error { return nil } -func (s *Storage) deleteSegmentAndRelatedData(key *Key) error { +func (s *Storage) deleteSegmentAndRelatedData(key *segment.Key) error { s.dicts.Delete(key.DictKey()) s.segments.Delete(key.SegmentKey()) - - for k, v := range key.labels { - dInt, err := s.dimensions.Get(k + ":" + v) - if err != nil { - return err + for k, v := range key.Labels() { + dInt, ok := s.dimensions.Lookup(k + ":" + v) + if !ok { + continue } d := dInt.(*dimension.Dimension) d.Delete(dimension.Key(key.SegmentKey())) diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index bb4789124a..3c95c7e267 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -7,12 +7,14 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/shirou/gopsutil/mem" + "github.com/sirupsen/logrus" + "github.com/pyroscope-io/pyroscope/pkg/config" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/storage/tree" "github.com/pyroscope-io/pyroscope/pkg/testing" "github.com/pyroscope-io/pyroscope/pkg/util/metrics" - "github.com/shirou/gopsutil/mem" - "github.com/sirupsen/logrus" ) // 21:22:08 air | (time.Duration) 16m40s, @@ -46,7 +48,7 @@ var _ = Describe("storage package", func() { et := testing.SimpleTime(19) st2 := testing.SimpleTime(0) et2 := testing.SimpleTime(30) - key, _ := ParseKey("foo") + key, _ := segment.ParseKey("foo") s.Put(&PutInput{ StartTime: st, @@ -85,7 +87,7 @@ var _ = Describe("storage package", func() { et := testing.SimpleTime(19) st2 := testing.SimpleTime(0) et2 := testing.SimpleTime(30) - key, _ := ParseKey("foo") + key, _ := segment.ParseKey("foo") s.Put(&PutInput{ StartTime: st, @@ -132,7 +134,7 @@ var _ = Describe("storage package", func() { et := testing.SimpleTime(19) st2 := testing.SimpleTime(0) et2 := testing.SimpleTime(30) - key, _ := ParseKey("foo") + key, _ := segment.ParseKey("foo") err := s.Put(&PutInput{ StartTime: st, @@ -186,7 +188,7 @@ var _ = Describe("storage package", func() { k := string(treeKey) + strconv.Itoa(i+1) tree.Insert([]byte(k), uint64(i+1)) - key, _ := ParseKey("tree key" + strconv.Itoa(i+1)) + key, _ := segment.ParseKey("tree key" + strconv.Itoa(i+1)) err := s.Put(&PutInput{ Key: key, Val: tree, @@ -207,7 +209,7 @@ var _ = Describe("storage package", func() { et := testing.SimpleTime(19) st2 := testing.SimpleTime(0) et2 := testing.SimpleTime(30) - key, _ := ParseKey("foo") + key, _ := segment.ParseKey("foo") err := s.Put(&PutInput{ StartTime: st, @@ -240,7 +242,7 @@ var _ = Describe("storage package", func() { et := testing.SimpleTime(29) st2 := testing.SimpleTime(0) et2 := testing.SimpleTime(30) - key, _ := ParseKey("foo") + key, _ := segment.ParseKey("foo") err := s.Put(&PutInput{ StartTime: st, @@ -277,7 +279,7 @@ var _ = Describe("storage package", func() { k := string(treeKey) + strconv.Itoa(i+1) tree.Insert([]byte(k), uint64(i+1)) - key, _ := ParseKey("tree key" + strconv.Itoa(i+1)) + key, _ := segment.ParseKey("tree key" + strconv.Itoa(i+1)) err := s.Put(&PutInput{ Key: key, Val: tree, @@ -312,8 +314,8 @@ var _ = Describe("storage package", func() { st2 := testing.SimpleTime(0) et2 := testing.SimpleTime(30) - appKey, _ := ParseKey("foo") - key, _ := ParseKey("foo{tag=value}") + appKey, _ := segment.ParseKey("foo") + key, _ := segment.ParseKey("foo{tag=value}") err := s.Put(&PutInput{ StartTime: st, @@ -369,7 +371,7 @@ var _ = Describe("DeleteDataBefore", func() { tree.Insert([]byte("a;c"), uint64(2)) st := time.Now().Add(time.Hour * 24 * 10 * -1) et := st.Add(time.Second * 10) - key, _ := ParseKey("foo") + key, _ := segment.ParseKey("foo") err := s.Put(&PutInput{ StartTime: st, @@ -392,7 +394,7 @@ var _ = Describe("DeleteDataBefore", func() { tree.Insert([]byte("a;c"), uint64(2)) st := testing.SimpleTime(10) et := testing.SimpleTime(20) - key, _ := ParseKey("foo") + key, _ := segment.ParseKey("foo") err := s.Put(&PutInput{ StartTime: st, diff --git a/pkg/storage/tree/flamebearer.go b/pkg/storage/tree/flamebearer.go index 180264103c..66474ec75d 100644 --- a/pkg/storage/tree/flamebearer.go +++ b/pkg/storage/tree/flamebearer.go @@ -12,8 +12,8 @@ type Flamebearer struct { } func (t *Tree) FlamebearerStruct(maxNodes int) *Flamebearer { - t.m.RLock() - defer t.m.RUnlock() + t.RLock() + defer t.RUnlock() res := Flamebearer{ Names: []string{}, diff --git a/pkg/storage/tree/serialize.go b/pkg/storage/tree/serialize.go index ab31bc1777..b751db1d03 100644 --- a/pkg/storage/tree/serialize.go +++ b/pkg/storage/tree/serialize.go @@ -14,8 +14,8 @@ import ( const currentVersion = 1 func (t *Tree) Serialize(d *dict.Dict, maxNodes int, w io.Writer) error { - t.m.RLock() - defer t.m.RUnlock() + t.RLock() + defer t.RUnlock() varint.Write(w, currentVersion) @@ -57,8 +57,8 @@ func (t *Tree) Serialize(d *dict.Dict, maxNodes int, w io.Writer) error { } func (t *Tree) SerializeNoDict(maxNodes int, w io.Writer) error { - t.m.RLock() - defer t.m.RUnlock() + t.RLock() + defer t.RUnlock() nodes := []*treeNode{t.root} minVal := t.minValue(maxNodes) diff --git a/pkg/storage/tree/tree.go b/pkg/storage/tree/tree.go index d5876fa17a..620b7e015a 100644 --- a/pkg/storage/tree/tree.go +++ b/pkg/storage/tree/tree.go @@ -50,7 +50,7 @@ var ( ) type Tree struct { - m sync.RWMutex + sync.RWMutex root *treeNode } @@ -62,13 +62,12 @@ func New() *Tree { func (t *Tree) Merge(srcTrieI merge.Merger) { srcTrie := srcTrieI.(*Tree) - srcNodes := []*treeNode{srcTrie.root} - dstNodes := []*treeNode{t.root} - srcTrie.m.RLock() - defer srcTrie.m.RUnlock() - t.m.Lock() - defer t.m.Unlock() + srcNodes := make([]*treeNode, 0, 100) + srcNodes = append(srcNodes, srcTrie.root) + + dstNodes := make([]*treeNode, 0, 100) + dstNodes = append(dstNodes, t.root) for len(srcNodes) > 0 { st := srcNodes[0] @@ -82,16 +81,36 @@ func (t *Tree) Merge(srcTrieI merge.Merger) { for _, srcChildNode := range st.ChildrenNodes { dstChildNode := dt.insert(srcChildNode.Name) + srcNodes = prepend(srcNodes, srcChildNode) + dstNodes = prepend(dstNodes, dstChildNode) + } + } +} - srcNodes = append([]*treeNode{srcChildNode}, srcNodes...) - dstNodes = append([]*treeNode{dstChildNode}, dstNodes...) +func prepend(s []*treeNode, x *treeNode) []*treeNode { + if len(s) != 0 && s[0] == x { + return s + } + prev := x + for i, elem := range s { + switch { + case i == 0: + s[0] = x + prev = elem + case elem == x: + s[i] = prev + return s + default: + s[i] = prev + prev = elem } } + return append(s, prev) } func (t *Tree) String() string { - t.m.RLock() - defer t.m.RUnlock() + t.RLock() + defer t.RUnlock() res := "" t.iterate(func(k []byte, v uint64) { @@ -118,9 +137,6 @@ func (n *treeNode) insert(targetLabel []byte) *treeNode { } func (t *Tree) Insert(key []byte, value uint64, _ ...bool) { - t.m.Lock() - defer t.m.Unlock() - // TODO: can optimize this, split is not necessary? labels := bytes.Split(key, []byte(";")) node := t.root @@ -180,8 +196,8 @@ func (t *Tree) Samples() uint64 { } func (t *Tree) Clone(r *big.Rat) *Tree { - t.m.RLock() - defer t.m.RUnlock() + t.RLock() + defer t.RUnlock() m := uint64(r.Num().Int64()) d := uint64(r.Denom().Int64()) @@ -193,8 +209,7 @@ func (t *Tree) Clone(r *big.Rat) *Tree { } func (t *Tree) MarshalJSON() ([]byte, error) { - t.m.RLock() - defer t.m.RUnlock() - + t.RLock() + defer t.RUnlock() return json.Marshal(t.root) } diff --git a/pkg/testing/load/app.go b/pkg/testing/load/app.go new file mode 100644 index 0000000000..e19e27faf0 --- /dev/null +++ b/pkg/testing/load/app.go @@ -0,0 +1,66 @@ +package load + +import ( + "time" + + "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/storage/segment" +) + +type App struct { + Name string + SpyName string + SampleRate uint32 + Units string + AggregationType string + + tags *TagsGenerator + trees *TreeGenerator +} + +type AppConfig struct { + SpyName string `yaml:"spyName"` + SampleRate uint32 `yaml:"sampleRate"` + Units string `yaml:"units"` + AggregationType string `yaml:"aggregationType"` + + Tags []Tag `yaml:"tags"` + Trees int `yaml:"trees"` + TreeConfig `yaml:"treeConfig"` +} + +type Tag struct { + Name string `yaml:"name"` + Cardinality int `yaml:"cardinality"` + MinLen int `yaml:"minLen"` + MaxLen int `yaml:"maxLen"` +} + +func NewApp(seed int, name string, c AppConfig) *App { + a := App{ + Name: name, + SpyName: c.SpyName, + SampleRate: c.SampleRate, + Units: c.Units, + AggregationType: c.AggregationType, + } + a.trees = NewTreeGenerator(seed, c.Trees, c.TreeConfig) + a.tags = NewTagGenerator(seed, name) + for _, t := range c.Tags { + a.tags.Add(t.Name, t.Cardinality, t.MinLen, t.MaxLen) + } + return &a +} + +func (a *App) CreatePutInput(from, to time.Time) *storage.PutInput { + return &storage.PutInput{ + StartTime: from, + EndTime: to, + Key: segment.NewKey(a.tags.Next()), + Val: a.trees.Next(), + SpyName: a.SpyName, + SampleRate: a.SampleRate, + Units: a.Units, + AggregationType: a.AggregationType, + } +} diff --git a/pkg/testing/load/cmd/dataloader/README.md b/pkg/testing/load/cmd/dataloader/README.md new file mode 100644 index 0000000000..bf4c4db112 --- /dev/null +++ b/pkg/testing/load/cmd/dataloader/README.md @@ -0,0 +1,19 @@ +# Data loader + +The tool populates a database with sample data. + +Generate sample data with dataloader: +``` +go run ./pkg/testing/load/cmd/dataloader -path config.yml +``` + +See example `config.yml` in `dataloader` directory for details. The loader writes data directly to the storage, the path should be specified in the configuration file: +```yaml +storage: + path: test_storage +``` + +Start server with `-storge-path` option (if `pyroscope server` is run in a container, you can map the volume): +``` +pyroscope server -storage-path test_storage +``` diff --git a/pkg/testing/load/cmd/dataloader/config.yml b/pkg/testing/load/cmd/dataloader/config.yml new file mode 100644 index 0000000000..9a4b55eeb3 --- /dev/null +++ b/pkg/testing/load/cmd/dataloader/config.yml @@ -0,0 +1,49 @@ +storage: + path: test_storage + +# sources specify how many trees to be ingested per 10s interval. +# Writes are distributed evenly among all the applications. +# A new tag set is generated for every write. +sources: 1000 + +# Time window to be filled with data. +period: 1h + +# Start date in RFC3339Nano. Defaults to now() - period. +# from: "2006-01-02T15:04:05.999999999Z07:00" + +# Number of concurrent writers. +# writers: 4 + +apps: + test-app.cpu: + spyName: gospy + sampleRate: 100 + aggregationType: sum + units: samples + trees: 10 + treeConfig: + maxSymLen: 10 + maxDepth: 30 + width: 10 + tags: + - name: env + cardinality: 3 + minLen: 4 + maxLen: 4 + - name: region + cardinality: 5 + minLen: 4 + maxLen: 7 + - name: version + cardinality: 4 + minLen: 5 + maxLen: 5 + - name: project + cardinality: 150 + minLen: 4 + maxLen: 12 + - name: instance + cardinality: 1000 + minLen: 16 + maxLen: 16 diff --git a/pkg/testing/load/cmd/dataloader/main.go b/pkg/testing/load/cmd/dataloader/main.go new file mode 100644 index 0000000000..22072c8073 --- /dev/null +++ b/pkg/testing/load/cmd/dataloader/main.go @@ -0,0 +1,112 @@ +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "os" + "time" + + "gopkg.in/yaml.v2" + + "github.com/pyroscope-io/pyroscope/pkg/config" + "github.com/pyroscope-io/pyroscope/pkg/storage" + "github.com/pyroscope-io/pyroscope/pkg/testing/load" +) + +type Config struct { + load.StorageWriteSuiteConfig `yaml:",inline"` + + Apps map[string]load.AppConfig + Storage struct{ Path string } +} + +func loadConfig(path string) (Config, error) { + var c Config + b, err := ioutil.ReadFile(path) + if err != nil { + return c, err + } + if err = yaml.Unmarshal(b, &c); err != nil { + return c, err + } + return c, nil +} + +func openStorage(path string) (*storage.Storage, error) { + if path == "" { + return nil, fmt.Errorf("storage path required") + } + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + return storage.New(&config.Server{ + StoragePath: path, + CacheEvictThreshold: 0.02, + CacheEvictVolume: 0.10, + MaxNodesSerialization: 2048, + MaxNodesRender: 2048, + }) +} + +func main() { + var path string + flag.StringVar(&path, "path", "config.yml", "config file path") + flag.Parse() + + c, err := loadConfig(path) + if err != nil { + log.Fatal(err) + } + + x, err := openStorage(c.Storage.Path) + if err != nil { + log.Fatal(err) + } + + c.WriteFn = func(input *storage.PutInput) { + if err = x.Put(input); err != nil { + fmt.Println(err) + } + } + + s := load.NewStorageWriteSuite(c.StorageWriteSuiteConfig) + for name, appConfig := range c.Apps { + s.AddApp(name, appConfig) + } + + start := time.Now() + t := time.NewTicker(time.Second) + defer t.Stop() + done := make(chan struct{}) + go func() { + for { + select { + case <-done: + return + case <-t.C: + stats := s.Stats() + var ( + p float32 + d time.Duration + e time.Duration + ) + if stats.RemainingPeriod > 0 { + p = float32(c.Period-stats.RemainingPeriod) * 100 / float32(c.Period) + d = time.Since(start) + e = time.Duration(100/p*float32(d)) - d + } else { + p = 100 + } + fmt.Printf("Progress: %.2f%%, estimated remaining time: %v\n", p, e) + } + } + }() + + s.Start() + close(done) + fmt.Println("Closing storage.") + x.Close() + fmt.Println("Done.") +} diff --git a/pkg/testing/load/rand.go b/pkg/testing/load/rand.go new file mode 100644 index 0000000000..f729ff4ef3 --- /dev/null +++ b/pkg/testing/load/rand.go @@ -0,0 +1,24 @@ +package load + +import ( + "encoding/hex" + "math/rand" +) + +func newRand(seed int) *rand.Rand { + return rand.New(rand.NewSource(int64(seed))) +} + +func randInt(r *rand.Rand, min, max int) int { + if max == min { + return max + } + return r.Intn(max-min) + min +} + +func randString(r *rand.Rand, min, max int) string { + l := randInt(r, min, max) + buf := make([]byte, l) + r.Read(buf) + return hex.EncodeToString(buf) +} diff --git a/pkg/testing/load/suite.go b/pkg/testing/load/suite.go new file mode 100644 index 0000000000..f6360bd5c1 --- /dev/null +++ b/pkg/testing/load/suite.go @@ -0,0 +1,115 @@ +package load + +import ( + "runtime" + "sync" + "time" + + "github.com/pyroscope-io/pyroscope/pkg/storage" +) + +type StorageWriteSuite struct { + apps []*App + sources int + + interval time.Duration + period time.Duration + from time.Time + + seed int + writers int + writeFn func(*storage.PutInput) +} + +type StorageWriteSuiteConfig struct { + Sources int + Interval time.Duration + Period time.Duration + From time.Time + + Seed int + Writers int + WriteFn func(*storage.PutInput) +} + +const ( + defaultInterval = 10 * time.Second + defaultRandSeed = 23061912 +) + +var defaultWriters = runtime.NumCPU() + +func NewStorageWriteSuite(c StorageWriteSuiteConfig) *StorageWriteSuite { + s := StorageWriteSuite{ + sources: c.Sources, + period: c.Period, + from: c.From, + writeFn: c.WriteFn, + interval: defaultInterval, + seed: defaultRandSeed, + writers: defaultWriters, + } + if s.writeFn == nil { + panic("WriteFn is required") + } + if s.period == 0 { + panic("Period duration is required") + } + if s.sources == 0 { + panic("Number of sources is required") + } + if s.from.IsZero() { + s.from = time.Now().Add(-s.period) + } + if c.Interval > 0 { + s.interval = c.Interval + } + if c.Seed > 0 { + s.seed = c.Seed + } + if c.Writers > 0 { + s.writers = c.Writers + } + return &s +} + +func (s *StorageWriteSuite) AddApp(name string, c AppConfig) *StorageWriteSuite { + s.apps = append(s.apps, NewApp(s.seed, name, c)) + return s +} + +type Stats struct { + RemainingPeriod time.Duration +} + +func (s *StorageWriteSuite) Stats() Stats { + return Stats{ + RemainingPeriod: s.period, + } +} + +func (s *StorageWriteSuite) Start() { + q := make(chan *storage.PutInput) + wg := new(sync.WaitGroup) + wg.Add(s.writers) + for i := 0; i < s.writers; i++ { + go func() { + defer wg.Done() + for p := range q { + s.writeFn(p) + } + }() + } + from := s.from + for s.period > 0 { + to := from.Add(s.interval) + for i := 0; i < s.sources; i++ { + a := s.apps[i%len(s.apps)] + q <- a.CreatePutInput(from, to) + } + from = to + s.period -= s.interval + } + close(q) + wg.Wait() +} diff --git a/pkg/testing/load/tag.go b/pkg/testing/load/tag.go new file mode 100644 index 0000000000..c176858abe --- /dev/null +++ b/pkg/testing/load/tag.go @@ -0,0 +1,49 @@ +package load + +import "math/rand" + +type TagsGenerator struct { + seed int + appName string + + tags []testTag + ixs []int +} + +type testTag struct { + name string + values []string +} + +func NewTagGenerator(seed int, appName string) *TagsGenerator { + return &TagsGenerator{seed: seed, appName: appName} +} + +func (g *TagsGenerator) Next() map[string]string { + k := map[string]string{"__name__": g.appName} + for i := 0; i < len(g.tags); i++ { + t := g.tags[i] + k[t.name] = t.values[g.ixs[i]%len(t.values)] + g.ixs[i]++ + } + return k +} + +func (g *TagsGenerator) Add(name string, card, min, max int) *TagsGenerator { + g.seed++ + r := newRand(g.seed) + g.ixs = append(g.ixs, 0) + g.tags = append(g.tags, testTag{ + name: name, + values: g.values(r, card, min, max), + }) + return g +} + +func (g *TagsGenerator) values(r *rand.Rand, count, min, max int) []string { + values := make([]string, count) + for i := 0; i < count; i++ { + values[i] = randString(r, min, max) + } + return values +} diff --git a/pkg/testing/load/tree.go b/pkg/testing/load/tree.go new file mode 100644 index 0000000000..d974993d24 --- /dev/null +++ b/pkg/testing/load/tree.go @@ -0,0 +1,76 @@ +package load + +import ( + "bytes" + "encoding/hex" + "math/rand" + + "github.com/pyroscope-io/pyroscope/pkg/storage/tree" +) + +type TreeGenerator struct { + TreeConfig + seed int + + trees []*tree.Tree + symBuf []byte + b *bytes.Buffer + i int +} + +type TreeConfig struct { + MaxSymLen int `yaml:"maxSymLen"` + MaxDepth int `yaml:"maxDepth"` + Width int `yaml:"width"` +} + +var rootNode = []byte("root") + +const ( + minStackDepth = 2 + minSymLength = 3 +) + +func NewTreeGenerator(seed, trees int, c TreeConfig) *TreeGenerator { + g := TreeGenerator{ + TreeConfig: c, + seed: seed, + symBuf: make([]byte, c.MaxSymLen), + trees: make([]*tree.Tree, trees), + } + g.b = bytes.NewBuffer(make([]byte, 128)) + for i := 0; i < trees; i++ { + seed++ + g.trees[i] = g.generateTree(newRand(seed)) + } + return &g +} + +func (g *TreeGenerator) Next() *tree.Tree { + g.i++ + return g.trees[g.i%len(g.trees)] +} + +func (g *TreeGenerator) generateTree(r *rand.Rand) *tree.Tree { + t := tree.New() + for w := 0; w < g.Width; w++ { + t.Insert(g.generateStack(r), uint64(r.Intn(100)), true) + } + return t +} + +func (g *TreeGenerator) generateStack(r *rand.Rand) []byte { + g.b.Reset() + g.b.Write(rootNode) + e := hex.NewEncoder(g.b) + d := randInt(r, minStackDepth, g.MaxDepth) + for i := 0; i < d; i++ { + l := randInt(r, minSymLength, g.MaxSymLen) + r.Read(g.symBuf[:l]) + g.b.WriteString(";") + _, _ = e.Write(g.symBuf[:l]) + } + s := make([]byte, g.b.Len()) + copy(s, g.b.Bytes()) + return s +} diff --git a/scripts/generate-sample-config/main.go b/scripts/generate-sample-config/main.go index 7ed358453b..599b0ee817 100644 --- a/scripts/generate-sample-config/main.go +++ b/scripts/generate-sample-config/main.go @@ -112,7 +112,7 @@ func writeConfigDocs(w io.Writer, subcommand, format string) { cli.PopulateFlagSet(val, flagSet, append(opts, cli.WithSkip("group-name", "user-name", "no-root-drop"))...) case "target": val = new(config.Target) - cli.PopulateFlagSet(val, flagSet, opts...) + cli.PopulateFlagSet(val, flagSet, append(opts, cli.WithSkip("tags"))...) default: log.Fatalf("Unknown subcommand %q", subcommand) } @@ -139,9 +139,10 @@ func writeYaml(w io.Writer, sf *cli.SortedFlags) { return } var v string - if reflect.TypeOf(f.Value).Elem().Kind() == reflect.Slice { + switch reflect.TypeOf(f.Value).Elem().Kind() { + case reflect.Slice, reflect.Map: v = f.Value.String() - } else { + default: v = fmt.Sprintf("%q", f.Value) } _, _ = fmt.Fprintf(w, "# %s\n%s: %s\n\n", toPrettySentence(f.Usage), f.Name, v)