Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tags support #280

Merged
merged 25 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cf9203c
Optimize tree merge
kolesnikovae Jul 12, 2021
53eec15
Fix cache persistence
kolesnikovae Jul 13, 2021
9a1a942
Add docs for loader tool
kolesnikovae Jul 13, 2021
1274b01
Remove unnecessary tree locks
kolesnikovae Jul 13, 2021
2a5533f
Update default loader config
kolesnikovae Jul 13, 2021
3bf1041
Add pyroQL query parser
kolesnikovae Jul 14, 2021
55d6b2b
Implement basic query engine
kolesnikovae Jul 14, 2021
cce9b30
Refactor tests
kolesnikovae Jul 15, 2021
99800a9
Extend storage Get to support queries
kolesnikovae Jul 15, 2021
2e1673a
Extend HTTP API to support query parameter
kolesnikovae Jul 15, 2021
db02f23
Extend agent session to support tags
kolesnikovae Jul 15, 2021
d488dc7
Remove redundant url encoding
kolesnikovae Jul 16, 2021
b819c71
Rename pypoql to flameql
kolesnikovae Jul 16, 2021
fb0be5b
Extend agent CLI and config to support tags
kolesnikovae Jul 16, 2021
0f49c19
Add tag key validation
kolesnikovae Jul 16, 2021
f038774
Refine tag key naming
kolesnikovae Jul 16, 2021
39e4cdb
Merge app name with tags
kolesnikovae Jul 16, 2021
420c8f2
Move tag merge logic to session
kolesnikovae Jul 19, 2021
9221916
Fix query result intersection
kolesnikovae Jul 19, 2021
998a1d4
Update script for docs gen
kolesnikovae Jul 19, 2021
64208cf
Merge branch 'main' into feature/tags-support
kolesnikovae Jul 19, 2021
9efa297
Solve merge conflicts, refactor router
kolesnikovae Jul 19, 2021
69960d6
Allow commas in tag values
kolesnikovae Jul 19, 2021
0ec59fc
Remove unused code
kolesnikovae Jul 20, 2021
26b117d
Move storage Key to segment package
kolesnikovae Jul 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/lfu-go v0.0.0-20141010002404-f174e76c5138
github.com/dgrijalva/lfu-go v0.0.0-00010101000000-000000000000
github.com/fatih/color v1.10.0
github.com/felixge/fgprof v0.9.1
github.com/go-ole/go-ole v1.2.5 // indirect
Expand Down Expand Up @@ -58,4 +58,4 @@ require (

replace github.com/mgechev/revive v1.0.3 => github.com/pyroscope-io/revive v1.0.6-0.20210330033039-4a71146f9dc1

replace github.com/dgrijalva/lfu-go => github.com/pyroscope-io/lfu-go v1.0.3
replace github.com/dgrijalva/lfu-go => github.com/pyroscope-io/lfu-go v1.0.4-0.20210705073127-18ae647b0d6e
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/pyroscope-io/dotnetdiag v1.2.1 h1:3XEMrfFJnZ87BiEhozyQKmCUAuMd/Spq7KChPuD2Cf0=
github.com/pyroscope-io/dotnetdiag v1.2.1/go.mod h1:eFUEHCp4eD1TgcXMlJihC+R4MrqGf7nTRdWxNADbDHA=
github.com/pyroscope-io/lfu-go v1.0.3 h1:jWwHf5MtfHioSYga2LrBEjmXluUloNQrKa3XXLFqjMA=
github.com/pyroscope-io/lfu-go v1.0.3/go.mod h1:3W9sGrDLhKFkHZPXkz6c5dAKrxcwkKbFFKnJtDukMDA=
github.com/pyroscope-io/lfu-go v1.0.4-0.20210705073127-18ae647b0d6e h1:r17BNAAXLsBRJWeRyROGSxont3ob/GmF4Qz2e68+Rfc=
github.com/pyroscope-io/lfu-go v1.0.4-0.20210705073127-18ae647b0d6e/go.mod h1:3W9sGrDLhKFkHZPXkz6c5dAKrxcwkKbFFKnJtDukMDA=
github.com/pyroscope-io/revive v1.0.6-0.20210330033039-4a71146f9dc1 h1:0v9lBNgdmVtpyyk9PP/DfpJlOHkXriu5YgNlrhQw5YE=
github.com/pyroscope-io/revive v1.0.6-0.20210330033039-4a71146f9dc1/go.mod h1:tSw34BaGZ0iF+oVKDOjq1/LuxGifgW7shaJ6+dBYFXg=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down
15 changes: 9 additions & 6 deletions pkg/agent/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (

type Config struct {
ApplicationName string // e.g backend.purchases
Tags map[string]string
ServerAddress string // e.g http://pyroscope.services.internal:4040
AuthToken string // specify this token when using pyroscope cloud
SampleRate uint32
Expand Down Expand Up @@ -62,6 +63,7 @@ func Start(cfg Config) (*Profiler, error) {
sc := agent.SessionConfig{
Upstream: upstream,
AppName: cfg.ApplicationName,
Tags: cfg.Tags,
ProfilingTypes: types.DefaultProfileTypes,
DisableGCRuns: cfg.DisableGCRuns,
SpyName: types.GoSpy,
Expand All @@ -70,14 +72,15 @@ func Start(cfg Config) (*Profiler, error) {
Pid: 0,
WithSubprocesses: false,
}
session := agent.NewSession(&sc, cfg.Logger)
if err := session.Start(); err != nil {
return nil, fmt.Errorf("start session: %v", err)
session, err := agent.NewSession(&sc, cfg.Logger)
if err != nil {
return nil, fmt.Errorf("new session: %w", err)
}
if err = session.Start(); err != nil {
return nil, fmt.Errorf("start session: %w", err)
}

return &Profiler{
session: session,
}, nil
return &Profiler{session: session}, nil
}

// Stop stops continious profiling session
Expand Down
72 changes: 68 additions & 4 deletions pkg/agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
_ "github.com/pyroscope-io/pyroscope/pkg/agent/rbspy"
"github.com/pyroscope-io/pyroscope/pkg/agent/types"
"github.com/pyroscope-io/pyroscope/pkg/agent/upstream"
"github.com/pyroscope-io/pyroscope/pkg/flameql"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
"github.com/pyroscope-io/pyroscope/pkg/util/slices"

// revive:enable:blank-imports
Expand Down Expand Up @@ -47,11 +49,16 @@ type ProfileSession struct {
stopTime time.Time

logger Logger

// The map holds fully qualified names (storage keys) for
// every profiler that is running within the session.
names map[spy.ProfileType]string
}

type SessionConfig struct {
Upstream upstream.Upstream
AppName string
Tags map[string]string
ProfilingTypes []spy.ProfileType
DisableGCRuns bool
SpyName string
Expand All @@ -61,10 +68,11 @@ type SessionConfig struct {
WithSubprocesses bool
}

func NewSession(c *SessionConfig, logger Logger) *ProfileSession {
func NewSession(c *SessionConfig, logger Logger) (*ProfileSession, error) {
ps := &ProfileSession{
upstream: c.Upstream,
appName: c.AppName,
names: make(map[spy.ProfileType]string),
spyName: c.SpyName,
profileTypes: c.ProfilingTypes,
disableGCRuns: c.DisableGCRuns,
Expand All @@ -84,7 +92,64 @@ func NewSession(c *SessionConfig, logger Logger) *ProfileSession {
ps.tries = make([]*transporttrie.Trie, 1)
}

return ps
if err := ps.createNames(c.Tags); err != nil {
return nil, err
}

return ps, nil
}

func (ps *ProfileSession) createNames(tags map[string]string) error {
for _, t := range ps.profileTypes {
tagsCopy := make(map[string]string)
for k, v := range tags {
tagsCopy[k] = v
}
appName, err := mergeTagsWithAppName(ps.appName, tagsCopy)
if err != nil {
return err
}
tagsCopy["__name__"] = appName + "." + string(t)
ps.names[t] = segment.NewKey(tagsCopy).Normalized()
}
return nil
}

// mergeTagsWithAppName validates user input and merges explicitly specified
// tags with tags from app name.
//
// App name may be in the full form including tags (app.name{foo=bar,baz=qux}).
// Returned application name is always short, any tags that were included are
// moved to tags map. When merged with explicitly provided tags (config/CLI),
// last take precedence.
//
// App name may be an empty string. Tags must not contain reserved keys,
// the map is modified in place.
func mergeTagsWithAppName(appName string, tags map[string]string) (string, error) {
k, err := segment.ParseKey(appName)
if err != nil {
return "", err
}
appName = k.AppName()
if tags == nil {
return appName, nil
}
// Note that at this point k may contain
// reserved tag keys (e.g. '__name__').
for tagKey, v := range k.Labels() {
if flameql.IsTagKeyReserved(tagKey) {
continue
}
if _, ok := tags[tagKey]; !ok {
tags[tagKey] = v
}
}
for tagKey := range tags {
if err = flameql.ValidateTagKey(tagKey); err != nil {
return "", err
}
}
return appName, nil
}

func (ps *ProfileSession) takeSnapshots() {
Expand Down Expand Up @@ -223,9 +288,8 @@ func (ps *ProfileSession) uploadTries(now time.Time) {
}

if !skipUpload {
name := ps.appName + "." + string(ps.profileTypes[i])
ps.upstream.Upload(&upstream.UploadJob{
Name: name,
Name: ps.names[ps.profileTypes[i]],
StartTime: ps.startTime,
EndTime: endTime,
SpyName: ps.spyName,
Expand Down
79 changes: 51 additions & 28 deletions pkg/agent/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,84 @@ import (
. "github.com/onsi/gomega"
"github.com/pyroscope-io/pyroscope/pkg/agent/spy"
"github.com/pyroscope-io/pyroscope/pkg/agent/upstream"
"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/structs/transporttrie"
"github.com/pyroscope-io/pyroscope/pkg/testing"
"github.com/sirupsen/logrus"
)

const durThreshold = 30 * time.Millisecond

type upstreamMock struct {
tries []*transporttrie.Trie
uploads []*upstream.UploadJob
}

func (*upstreamMock) Stop() {

}
func (*upstreamMock) Stop() {}

func (u *upstreamMock) Upload(j *upstream.UploadJob) {
u.tries = append(u.tries, j.Trie)
u.uploads = append(u.uploads, j)
}

var _ = Describe("agent.Session", func() {
testing.WithConfig(func(cfg **config.Config) {
Describe("NewSession", func() {
It("creates a new session and performs chunking", func(done Done) {
Describe("NewSession", func() {
It("creates a new session and performs chunking", func(done Done) {
u := &upstreamMock{}
uploadRate := 200 * time.Millisecond
s, _ := NewSession(&SessionConfig{
Upstream: u,
AppName: "test-app",
ProfilingTypes: []spy.ProfileType{spy.ProfileCPU},
SpyName: "debugspy",
SampleRate: 100,
UploadRate: uploadRate,
Pid: os.Getpid(),
WithSubprocesses: true,
}, logrus.StandardLogger())
now := time.Now()
time.Sleep(now.Truncate(uploadRate).Add(uploadRate + 10*time.Millisecond).Sub(now))
err := s.Start()

Expect(err).ToNot(HaveOccurred())
time.Sleep(500 * time.Millisecond)
s.Stop()

Expect(u.uploads).To(HaveLen(3))
u.uploads[0].Trie.Iterate(func(name []byte, val uint64) {
Expect(val).To(BeNumerically("~", 19, 2))
})
u.uploads[1].Trie.Iterate(func(name []byte, val uint64) {
Expect(val).To(BeNumerically("~", 20, 2))
})
u.uploads[2].Trie.Iterate(func(name []byte, val uint64) {
Expect(val).To(BeNumerically("~", 11, 2))
})
close(done)
})

When("tags specified", func() {
It("name ", func() {
u := &upstreamMock{}
uploadRate := 200 * time.Millisecond
s := NewSession(&SessionConfig{
c := &SessionConfig{
Upstream: u,
AppName: "test-app",
AppName: "test-app{bar=xxx}",
ProfilingTypes: []spy.ProfileType{spy.ProfileCPU},
SpyName: "debugspy",
SampleRate: 100,
UploadRate: uploadRate,
Pid: os.Getpid(),
WithSubprocesses: true,
}, logrus.StandardLogger())
Tags: map[string]string{
"foo": "bar",
"baz": "qux",
},
}

s, _ := NewSession(c, logrus.StandardLogger())
now := time.Now()
time.Sleep(now.Truncate(uploadRate).Add(uploadRate + 10*time.Millisecond).Sub(now))
err := s.Start()

Expect(err).ToNot(HaveOccurred())
time.Sleep(500 * time.Millisecond)
s.Stop()

Expect(u.tries).To(HaveLen(3))
u.tries[0].Iterate(func(name []byte, val uint64) {
Expect(val).To(BeNumerically("~", 19, 2))
})
u.tries[1].Iterate(func(name []byte, val uint64) {
Expect(val).To(BeNumerically("~", 20, 2))
})
u.tries[2].Iterate(func(name []byte, val uint64) {
Expect(val).To(BeNumerically("~", 11, 2))
})
close(done)
Expect(u.uploads).To(HaveLen(3))
Expect(u.uploads[0].Name).To(Equal("test-app.cpu{bar=xxx,baz=qux,foo=bar}"))
})
})
})
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/target/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func newServiceTarget(logger *logrus.Logger, upstream *remote.Remote, t config.T
sc: &agent.SessionConfig{
Upstream: upstream,
AppName: t.ApplicationName,
Tags: t.Tags,
ProfilingTypes: []spy.ProfileType{spy.ProfileCPU},
SpyName: t.SpyName,
SampleRate: uint32(t.SampleRate),
Expand Down Expand Up @@ -68,7 +69,10 @@ func (s *service) wait(ctx context.Context) error {
pyspy.Blocking = s.target.PyspyBlocking
rbspy.Blocking = s.target.RbspyBlocking

session := agent.NewSession(s.sc, s.logger)
session, err := agent.NewSession(s.sc, s.logger)
if err != nil {
return err
}
if err := session.Start(); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/upstream/direct/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/agent/upstream"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
)

Expand Down Expand Up @@ -51,7 +52,7 @@ func (u *Direct) Upload(j *upstream.UploadJob) {
}

func (u *Direct) uploadProfile(j *upstream.UploadJob) {
key, err := storage.ParseKey(j.Name)
key, err := segment.ParseKey(j.Name)
if err != nil {
logrus.WithField("key", key).Error("invalid key:")
return
Expand Down
25 changes: 23 additions & 2 deletions pkg/cli/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func (svc *agentService) Stop(_ service.Service) error {
return nil
}

func loadTargets(c *config.Agent) error {
// loadAgentConfig is a hack for ff parser, which can't parse maps, structs,
// and slices. The function to be called after the parser finishes just to
// fill missing configuration elements.
func loadAgentConfig(c *config.Agent) error {
b, err := ioutil.ReadFile(c.Config)
switch {
case err == nil:
Expand All @@ -64,10 +67,28 @@ func loadTargets(c *config.Agent) error {
if err = yaml.Unmarshal(b, &a); err != nil {
return err
}
c.Targets = a.Targets
// Override tags from config file with flags.
c.Tags = mergeTags(a.Tags, c.Tags)
for _, t := range a.Targets {
t.Tags = mergeTags(t.Tags, c.Tags)
c.Targets = append(c.Targets, t)
}
return nil
}

// mergeTags creates a new map with tags from a and b.
// Values from b take precedence. Returned map is never nil.
func mergeTags(a, b map[string]string) map[string]string {
t := make(map[string]string, len(a))
for k, v := range a {
t[k] = v
}
for k, v := range b {
t[k] = v
}
return t
}

func createLogger(config *config.Agent) (*logrus.Logger, error) {
if config.NoLogging {
logrus.SetOutput(ioutil.Discard)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func startAgent(config *config.Agent) error {
return fmt.Errorf("could not create logger: %w", err)
}
logger.Info("starting pyroscope agent")
if err = loadTargets(config); err != nil {
if err = loadAgentConfig(config); err != nil {
return fmt.Errorf("could not load targets: %w", err)
}
agent, err := newAgentService(logger, config)
Expand Down
2 changes: 0 additions & 2 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func generateRootCmd(cfg *config.Config) *ffcli.Command {
fmt.Println(DefaultUsageFunc(execSortedFlags, execCmd, []string{}))
return nil
}

err := exec.Cli(&cfg.Exec, args)
// Normally, if the program ran, the call should return ExitError and
// the exit code must be preserved. Otherwise, the error originates from
Expand All @@ -163,7 +162,6 @@ func generateRootCmd(cfg *config.Config) *ffcli.Command {
fmt.Println(DefaultUsageFunc(connectSortedFlags, connectCmd, []string{}))
return nil
}

return exec.Cli(&cfg.Exec, args)
}

Expand Down
Loading