Skip to content

Commit 1536b75

Browse files
committed
pyrobench: basic load generation
1 parent 580ed5f commit 1536b75

File tree

6 files changed

+230
-14
lines changed

6 files changed

+230
-14
lines changed

benchmark/cmd/command/loadgen.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,19 @@
11
package command
22

33
import (
4-
"fmt"
5-
64
"github.com/pyroscope-io/pyroscope/benchmark/config"
5+
"github.com/pyroscope-io/pyroscope/benchmark/loadgen"
76
"github.com/pyroscope-io/pyroscope/pkg/cli"
87
"github.com/spf13/cobra"
98
)
109

11-
func newLoadGen(cfg *config.Config) *cobra.Command {
10+
func newLoadGen(cfg *config.LoadGen) *cobra.Command {
1211
vpr := newViper()
1312
loadgenCmd := &cobra.Command{
1413
Use: "loadgen [flags]",
1514
Short: "Generates load",
1615
RunE: createCmdRunFn(cfg, vpr, false, func(cmd *cobra.Command, args []string, logger config.LoggerFunc) error {
17-
fmt.Println("address", cfg.ServerAddress)
18-
19-
return nil
16+
return loadgen.Cli(cfg)
2017
}),
2118
}
2219

benchmark/cmd/command/root.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/spf13/cobra"
1212
)
1313

14-
func newRootCmd(cfg *config.Config) *cobra.Command {
14+
func newRootCmd(cfg *config.LoadGen) *cobra.Command {
1515
rootCmd := &cobra.Command{
1616
Use: "pyrobench [flags] <subcommand>",
1717
}
@@ -32,7 +32,7 @@ func newRootCmd(cfg *config.Config) *cobra.Command {
3232

3333
// Initialize adds all child commands to the root command and sets flags appropriately
3434
func Initialize() error {
35-
var cfg config.Config
35+
var cfg config.LoadGen
3636

3737
rootCmd := newRootCmd(&cfg)
3838
rootCmd.SilenceErrors = true

benchmark/cmd/logging.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copied as is from github.com/pyroscope-io/pyroscope/cmd/logging.go
2+
package main
3+
4+
import (
5+
"log"
6+
"os"
7+
"runtime"
8+
9+
"github.com/fatih/color"
10+
"github.com/sirupsen/logrus"
11+
)
12+
13+
func init() {
14+
log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime)
15+
16+
logrus.SetFormatter(&logrus.TextFormatter{})
17+
logrus.SetOutput(os.Stdout)
18+
logrus.SetLevel(logrus.DebugLevel)
19+
20+
if runtime.GOOS == "windows" {
21+
color.NoColor = true
22+
}
23+
}

benchmark/config/config.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package config
22

3-
type Config struct {
4-
Version bool `mapstructure:"version"`
5-
}
3+
type LoadGen struct {
4+
LogLevel string `def:"info" desc:"log level: debug|info|warn|error" mapstructure:"log-level"`
5+
6+
ServerAddress string `def:"http://localhost:4040" desc:"address of the pyroscope instance being attacked" mapstructure:"server-address"`
7+
RandSeed int `def:"23061912" desc:""`
8+
ProfileWidth int `def:"20"`
9+
ProfileDepth int `def:"20"`
10+
ProfileSymbolLength int `def:"30"`
11+
Fixtures int `def:"30" desc:"how many different profiles to generate per app"`
12+
Apps int `def:"20" desc:"how many pyroscope apps to emulate"`
13+
Clients int `def:"20" desc:"how many pyroscope clients to emulate"`
14+
Requests int `def:"10000" desc:"how many requests each clients should make"`
615

7-
type LoggerFunc func(s string)
8-
type LoggerConfiger interface{ InitializeLogging() LoggerFunc }
9-
type FileConfiger interface{ ConfigFilePath() string }
16+
WaitUntilAvailable bool `def:"true" desc:"wait until endpoint is available"`
17+
}

benchmark/config/interface.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package config
2+
3+
import (
4+
"github.com/sirupsen/logrus"
5+
)
6+
7+
type FileConfiger interface{ ConfigFilePath() string }
8+
9+
type LoggerFunc func(s string)
10+
type LoggerConfiger interface{ InitializeLogging() LoggerFunc }
11+
12+
func (cfg LoadGen) InitializeLogging() LoggerFunc {
13+
if l, err := logrus.ParseLevel(cfg.LogLevel); err == nil {
14+
logrus.SetLevel(l)
15+
}
16+
17+
return nil
18+
}

benchmark/loadgen/loadgen.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package loadgen
2+
3+
import (
4+
"encoding/hex"
5+
"fmt"
6+
"math/rand"
7+
"net/http"
8+
"sync"
9+
"time"
10+
11+
"github.com/pyroscope-io/pyroscope/benchmark/config"
12+
"github.com/pyroscope-io/pyroscope/pkg/agent/upstream"
13+
"github.com/pyroscope-io/pyroscope/pkg/agent/upstream/remote"
14+
"github.com/pyroscope-io/pyroscope/pkg/structs/transporttrie"
15+
"github.com/sirupsen/logrus"
16+
)
17+
18+
// how many retries to check the pyroscope server is up
19+
const MaxReadinessRetries = 10
20+
21+
type Fixtures [][]*transporttrie.Trie
22+
23+
type LoadGen struct {
24+
Config *config.LoadGen
25+
Rand *rand.Rand
26+
SymbolBuf []byte
27+
}
28+
29+
func Cli(cfg *config.LoadGen) error {
30+
r := rand.New(rand.NewSource(int64(cfg.RandSeed)))
31+
l := &LoadGen{
32+
Config: cfg,
33+
Rand: r,
34+
SymbolBuf: make([]byte, cfg.ProfileSymbolLength),
35+
}
36+
37+
return l.Run(cfg)
38+
}
39+
40+
func (l *LoadGen) Run(cfg *config.LoadGen) error {
41+
logrus.Info("checking server is available...")
42+
err := waitUntilEndpointReady(cfg.ServerAddress)
43+
if err != nil {
44+
return err
45+
}
46+
47+
logrus.Info("generating fixtures")
48+
fixtures := l.generateFixtures()
49+
logrus.Debug("done generating fixtures.")
50+
51+
logrus.Info("starting sending requests")
52+
wg := sync.WaitGroup{}
53+
wg.Add(l.Config.Apps * l.Config.Clients)
54+
appNameBuf := make([]byte, 25)
55+
56+
for i := 0; i < l.Config.Apps; i++ {
57+
// generate a random app name
58+
l.Rand.Read(appNameBuf)
59+
appName := hex.EncodeToString(appNameBuf)
60+
for j := 0; j < l.Config.Clients; j++ {
61+
go l.startClientThread(appName, &wg, fixtures[i])
62+
}
63+
}
64+
wg.Wait()
65+
66+
logrus.Debug("done sending requests")
67+
return nil
68+
}
69+
70+
func (l *LoadGen) generateFixtures() Fixtures {
71+
var f Fixtures
72+
73+
for i := 0; i < l.Config.Apps; i++ {
74+
f = append(f, []*transporttrie.Trie{})
75+
76+
randomGen := rand.New(rand.NewSource(int64(l.Config.RandSeed + i)))
77+
p := l.generateProfile(randomGen)
78+
for j := 0; j < l.Config.Fixtures; j++ {
79+
f[i] = append(f[i], p)
80+
}
81+
}
82+
83+
return f
84+
}
85+
86+
func (l *LoadGen) startClientThread(appName string, wg *sync.WaitGroup, appFixtures []*transporttrie.Trie) {
87+
rc := remote.RemoteConfig{
88+
UpstreamThreads: 1,
89+
UpstreamAddress: l.Config.ServerAddress,
90+
UpstreamRequestTimeout: 10 * time.Second,
91+
}
92+
r, err := remote.New(rc, logrus.StandardLogger())
93+
if err != nil {
94+
panic(err)
95+
}
96+
97+
requestsCount := l.Config.Requests
98+
99+
threadStartTime := time.Now().Truncate(10 * time.Second)
100+
threadStartTime = threadStartTime.Add(time.Duration(-1*requestsCount) * (10 * time.Second))
101+
102+
st := threadStartTime
103+
104+
for i := 0; i < requestsCount; i++ {
105+
t := appFixtures[i%len(appFixtures)]
106+
107+
st = st.Add(10 * time.Second)
108+
et := st.Add(10 * time.Second)
109+
err := r.UploadSync(&upstream.UploadJob{
110+
Name: appName + "{}",
111+
StartTime: st,
112+
EndTime: et,
113+
SpyName: "gospy",
114+
SampleRate: 100,
115+
Units: "samples",
116+
AggregationType: "sum",
117+
Trie: t,
118+
})
119+
if err != nil {
120+
// TODO(eh-am): calculate errors
121+
time.Sleep(time.Second)
122+
} else {
123+
// TODO(eh-am): calculate success
124+
}
125+
}
126+
127+
wg.Done()
128+
}
129+
130+
func (l *LoadGen) generateProfile(randomGen *rand.Rand) *transporttrie.Trie {
131+
t := transporttrie.New()
132+
133+
for w := 0; w < l.Config.ProfileWidth; w++ {
134+
symbol := []byte("root")
135+
for d := 0; d < 2+l.Rand.Intn(l.Config.ProfileDepth); d++ {
136+
randomGen.Read(l.SymbolBuf)
137+
symbol = append(symbol, byte(';'))
138+
symbol = append(symbol, []byte(hex.EncodeToString(l.SymbolBuf))...)
139+
if l.Rand.Intn(100) <= 20 {
140+
t.Insert(symbol, uint64(l.Rand.Intn(100)), true)
141+
}
142+
}
143+
144+
t.Insert(symbol, uint64(l.Rand.Intn(100)), true)
145+
}
146+
return t
147+
}
148+
149+
// TODO(eh-am) exponential backoff and whatnot
150+
func waitUntilEndpointReady(url string) error {
151+
client := http.Client{Timeout: 10 * time.Second}
152+
retries := 0
153+
154+
for {
155+
_, err := client.Get(url)
156+
157+
// all good?
158+
if err == nil {
159+
return nil
160+
}
161+
if retries >= MaxReadinessRetries {
162+
break
163+
}
164+
165+
time.Sleep(time.Second)
166+
retries++
167+
}
168+
169+
return fmt.Errorf("maximum retries exceeded ('%d') waiting for server ('%s') to respond", retries, url)
170+
}

0 commit comments

Comments
 (0)