diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index 79d2ce873c5..59ea6337115 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -54,6 +54,24 @@ func InitCluster(ctx context.Context, cli pd.Client, httpCli pdHttp.Client) erro return nil } +// Config is the configuration for the case. +type Config struct { + QPS int64 `toml:"qps" json:"qps"` + Burst int64 `toml:"burst" json:"burst"` +} + +func newConfig() *Config { + return &Config{ + Burst: 1, + } +} + +// Clone returns a cloned configuration. +func (c *Config) Clone() *Config { + cfg := *c + return &cfg +} + // Case is the interface for all cases. type Case interface { Name() string @@ -61,12 +79,12 @@ type Case interface { GetQPS() int64 SetBurst(int64) GetBurst() int64 + GetConfig() *Config } type baseCase struct { - name string - qps int64 - burst int64 + name string + cfg *Config } func (c *baseCase) Name() string { @@ -74,19 +92,23 @@ func (c *baseCase) Name() string { } func (c *baseCase) SetQPS(qps int64) { - c.qps = qps + c.cfg.QPS = qps } func (c *baseCase) GetQPS() int64 { - return c.qps + return c.cfg.QPS } func (c *baseCase) SetBurst(burst int64) { - c.burst = burst + c.cfg.Burst = burst } func (c *baseCase) GetBurst() int64 { - return c.burst + return c.cfg.Burst +} + +func (c *baseCase) GetConfig() *Config { + return c.cfg.Clone() } // GRPCCase is the interface for all gRPC cases. @@ -100,11 +122,12 @@ type GRPCCraeteFn func() GRPCCase // GRPCCaseFnMap is the map for all gRPC case creation function. var GRPCCaseFnMap = map[string]GRPCCraeteFn{ - "GetRegion": newGetRegion(), - "GetStore": newGetStore(), - "GetStores": newGetStores(), - "ScanRegions": newScanRegions(), - "Tso": newTso(), + "GetRegion": newGetRegion(), + "GetRegionEnableFollower": newGetRegionEnableFollower(), + "GetStore": newGetStore(), + "GetStores": newGetStores(), + "ScanRegions": newScanRegions(), + "Tso": newTso(), } // GRPCCaseMap is the map for all gRPC case creation function. @@ -136,9 +159,8 @@ func newMinResolvedTS() func() HTTPCase { return func() HTTPCase { return &minResolvedTS{ baseCase: &baseCase{ - name: "GetMinResolvedTS", - qps: 1000, - burst: 1, + name: "GetMinResolvedTS", + cfg: newConfig(), }, } } @@ -164,9 +186,8 @@ func newRegionStats() func() HTTPCase { return func() HTTPCase { return ®ionsStats{ baseCase: &baseCase{ - name: "GetRegionStatus", - qps: 100, - burst: 1, + name: "GetRegionStatus", + cfg: newConfig(), }, regionSample: 1000, } @@ -200,9 +221,8 @@ func newGetRegion() func() GRPCCase { return func() GRPCCase { return &getRegion{ baseCase: &baseCase{ - name: "GetRegion", - qps: 10000, - burst: 1, + name: "GetRegion", + cfg: newConfig(), }, } } @@ -217,6 +237,30 @@ func (c *getRegion) Unary(ctx context.Context, cli pd.Client) error { return nil } +type getRegionEnableFollower struct { + *baseCase +} + +func newGetRegionEnableFollower() func() GRPCCase { + return func() GRPCCase { + return &getRegionEnableFollower{ + baseCase: &baseCase{ + name: "GetRegionEnableFollower", + cfg: newConfig(), + }, + } + } +} + +func (c *getRegionEnableFollower) Unary(ctx context.Context, cli pd.Client) error { + id := rand.Intn(totalRegion)*4 + 1 + _, err := cli.GetRegion(ctx, generateKeyForSimulator(id, 56), pd.WithAllowFollowerHandle()) + if err != nil { + return err + } + return nil +} + type scanRegions struct { *baseCase regionSample int @@ -226,9 +270,8 @@ func newScanRegions() func() GRPCCase { return func() GRPCCase { return &scanRegions{ baseCase: &baseCase{ - name: "ScanRegions", - qps: 10000, - burst: 1, + name: "ScanRegions", + cfg: newConfig(), }, regionSample: 10000, } @@ -255,9 +298,8 @@ func newTso() func() GRPCCase { return func() GRPCCase { return &tso{ baseCase: &baseCase{ - name: "Tso", - qps: 10000, - burst: 1, + name: "Tso", + cfg: newConfig(), }, } } @@ -279,9 +321,8 @@ func newGetStore() func() GRPCCase { return func() GRPCCase { return &getStore{ baseCase: &baseCase{ - name: "GetStore", - qps: 10000, - burst: 1, + name: "GetStore", + cfg: newConfig(), }, } } @@ -304,9 +345,8 @@ func newGetStores() func() GRPCCase { return func() GRPCCase { return &getStores{ baseCase: &baseCase{ - name: "GetStores", - qps: 10000, - burst: 1, + name: "GetStores", + cfg: newConfig(), }, } } diff --git a/tools/pd-api-bench/cases/controller.go b/tools/pd-api-bench/cases/controller.go new file mode 100644 index 00000000000..2a4561a3d2a --- /dev/null +++ b/tools/pd-api-bench/cases/controller.go @@ -0,0 +1,268 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + pd "github.com/tikv/pd/client" + pdHttp "github.com/tikv/pd/client/http" + "go.uber.org/zap" +) + +var base = int64(time.Second) / int64(time.Microsecond) + +// Coordinator managers the operation of the gRPC and HTTP case. +type Coordinator struct { + ctx context.Context + + httpClients []pdHttp.Client + gRPCClients []pd.Client + + http map[string]*httpController + grpc map[string]*gRPCController + + mu sync.RWMutex +} + +// NewCoordinator returns a new coordinator. +func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client) *Coordinator { + return &Coordinator{ + ctx: ctx, + httpClients: httpClients, + gRPCClients: gRPCClients, + http: make(map[string]*httpController), + grpc: make(map[string]*gRPCController), + } +} + +// GetHTTPCase returns the HTTP case config. +func (c *Coordinator) GetHTTPCase(name string) (*Config, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if controller, ok := c.http[name]; ok { + return controller.GetConfig(), nil + } + return nil, errors.Errorf("case %v does not exist.", name) +} + +// GetGRPCCase returns the gRPC case config. +func (c *Coordinator) GetGRPCCase(name string) (*Config, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if controller, ok := c.grpc[name]; ok { + return controller.GetConfig(), nil + } + return nil, errors.Errorf("case %v does not exist.", name) +} + +// GetAllHTTPCases returns the all HTTP case configs. +func (c *Coordinator) GetAllHTTPCases() map[string]*Config { + c.mu.RLock() + defer c.mu.RUnlock() + ret := make(map[string]*Config) + for name, c := range c.http { + ret[name] = c.GetConfig() + } + return ret +} + +// GetAllGRPCCases returns the all gRPC case configs. +func (c *Coordinator) GetAllGRPCCases() map[string]*Config { + c.mu.RLock() + defer c.mu.RUnlock() + ret := make(map[string]*Config) + for name, c := range c.grpc { + ret[name] = c.GetConfig() + } + return ret +} + +// SetHTTPCase sets the config for the specific case. +func (c *Coordinator) SetHTTPCase(name string, cfg *Config) error { + c.mu.Lock() + defer c.mu.Unlock() + if fn, ok := HTTPCaseFnMap[name]; ok { + var controller *httpController + if controller, ok = c.http[name]; !ok { + controller = newHTTPController(c.ctx, c.httpClients, fn) + c.http[name] = controller + } + controller.stop() + controller.SetQPS(cfg.QPS) + if cfg.Burst > 0 { + controller.SetBurst(cfg.Burst) + } + controller.run() + } else { + return errors.Errorf("HTTP case %s not implemented", name) + } + return nil +} + +// SetGRPCCase sets the config for the specific case. +func (c *Coordinator) SetGRPCCase(name string, cfg *Config) error { + c.mu.Lock() + defer c.mu.Unlock() + if fn, ok := GRPCCaseFnMap[name]; ok { + var controller *gRPCController + if controller, ok = c.grpc[name]; !ok { + controller = newGRPCController(c.ctx, c.gRPCClients, fn) + c.grpc[name] = controller + } + controller.stop() + controller.SetQPS(cfg.QPS) + if cfg.Burst > 0 { + controller.SetBurst(cfg.Burst) + } + controller.run() + } else { + return errors.Errorf("HTTP case %s not implemented", name) + } + return nil +} + +type httpController struct { + HTTPCase + clients []pdHttp.Client + pctx context.Context + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func newHTTPController(ctx context.Context, clis []pdHttp.Client, fn HTTPCraeteFn) *httpController { + c := &httpController{ + pctx: ctx, + clients: clis, + HTTPCase: fn(), + } + return c +} + +// run tries to run the HTTP api bench. +func (c *httpController) run() { + if c.GetQPS() <= 0 || c.cancel != nil { + return + } + c.ctx, c.cancel = context.WithCancel(c.pctx) + qps := c.GetQPS() + burst := c.GetBurst() + cliNum := int64(len(c.clients)) + tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond + log.Info("begin to run http case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + for _, hCli := range c.clients { + c.wg.Add(1) + go func(hCli pdHttp.Client) { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + for i := int64(0); i < burst; i++ { + err := c.Do(c.ctx, hCli) + if err != nil { + log.Error("meet erorr when doing HTTP request", zap.String("case", c.Name()), zap.Error(err)) + } + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running HTTP case") + return + } + } + }(hCli) + } +} + +// stop stops the HTTP api bench. +func (c *httpController) stop() { + if c.cancel == nil { + return + } + c.cancel() + c.cancel = nil + c.wg.Wait() +} + +type gRPCController struct { + GRPCCase + clients []pd.Client + pctx context.Context + + ctx context.Context + cancel context.CancelFunc + + wg sync.WaitGroup +} + +func newGRPCController(ctx context.Context, clis []pd.Client, fn GRPCCraeteFn) *gRPCController { + c := &gRPCController{ + pctx: ctx, + clients: clis, + GRPCCase: fn(), + } + return c +} + +// run tries to run the gRPC api bench. +func (c *gRPCController) run() { + if c.GetQPS() <= 0 || c.cancel != nil { + return + } + c.ctx, c.cancel = context.WithCancel(c.pctx) + qps := c.GetQPS() + burst := c.GetBurst() + cliNum := int64(len(c.clients)) + tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond + log.Info("begin to run gRPC case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + for _, cli := range c.clients { + c.wg.Add(1) + go func(cli pd.Client) { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + for i := int64(0); i < burst; i++ { + err := c.Unary(c.ctx, cli) + if err != nil { + log.Error("meet erorr when doing gRPC request", zap.String("case", c.Name()), zap.Error(err)) + } + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running gRPC case") + return + } + } + }(cli) + } +} + +// stop stops the gRPC api bench. +func (c *gRPCController) stop() { + if c.cancel == nil { + return + } + c.cancel() + c.cancel = nil + c.wg.Wait() +} diff --git a/tools/pd-api-bench/config/config.go b/tools/pd-api-bench/config/config.go index 783a7ee463a..b2b254ad766 100644 --- a/tools/pd-api-bench/config/config.go +++ b/tools/pd-api-bench/config/config.go @@ -29,6 +29,7 @@ type Config struct { flagSet *flag.FlagSet configFile string PDAddr string `toml:"pd" json:"pd"` + StatusAddr string `toml:"status" json:"status"` Log log.Config `toml:"log" json:"log"` Logger *zap.Logger @@ -42,13 +43,8 @@ type Config struct { KeyPath string `toml:"key-path" json:"key-path"` // only for init - HTTP map[string]caseConfig `toml:"http" json:"http"` - GRPC map[string]caseConfig `toml:"grpc" json:"grpc"` -} - -type caseConfig struct { - QPS int64 `toml:"qps" json:"qps"` - Burst int64 `toml:"burst" json:"burst"` + HTTP map[string]cases.Config `toml:"http" json:"http"` + GRPC map[string]cases.Config `toml:"grpc" json:"grpc"` } // NewConfig return a set of settings. @@ -58,6 +54,7 @@ func NewConfig(flagSet *flag.FlagSet) *Config { fs := cfg.flagSet fs.StringVar(&cfg.configFile, "config", "", "config file") fs.StringVar(&cfg.PDAddr, "pd", "http://127.0.0.1:2379", "pd address") + fs.StringVar(&cfg.StatusAddr, "status", "127.0.0.1:10081", "status address") fs.Int64Var(&cfg.Client, "client", 1, "client number") fs.StringVar(&cfg.CaPath, "cacert", "", "path of file that contains list of trusted SSL CAs") fs.StringVar(&cfg.CertPath, "cert", "", "path of file that contains X509 certificate in PEM format") @@ -93,43 +90,23 @@ func (c *Config) Parse(arguments []string) error { return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0)) } + return nil +} + +// InitCoordinator set case config from config itself. +func (c *Config) InitCoordinator(co *cases.Coordinator) { for name, cfg := range c.HTTP { - if fn, ok := cases.HTTPCaseFnMap[name]; ok { - var cas cases.HTTPCase - if cas, ok = cases.HTTPCaseMap[name]; !ok { - cas = fn() - cases.HTTPCaseMap[name] = cas - } - if cfg.QPS > 0 { - cas.SetQPS(cfg.QPS) - } - if cfg.Burst > 0 { - cas.SetBurst(cfg.Burst) - } - } else { - log.Warn("HTTP case not implemented", zap.String("case", name)) + err := co.SetHTTPCase(name, &cfg) + if err != nil { + log.Error("create HTTP case failed", zap.Error(err)) } } - for name, cfg := range c.GRPC { - if fn, ok := cases.GRPCCaseFnMap[name]; ok { - var cas cases.GRPCCase - if cas, ok = cases.GRPCCaseMap[name]; !ok { - cas = fn() - cases.GRPCCaseMap[name] = cas - } - if cfg.QPS > 0 { - cas.SetQPS(cfg.QPS) - } - if cfg.Burst > 0 { - cas.SetBurst(cfg.Burst) - } - } else { - log.Warn("gRPC case not implemented", zap.String("case", name)) + err := co.SetGRPCCase(name, &cfg) + if err != nil { + log.Error("create gRPC case failed", zap.Error(err)) } } - - return nil } // Adjust is used to adjust configurations diff --git a/tools/pd-api-bench/main.go b/tools/pd-api-bench/main.go index 56e7ee761b2..681c3579012 100644 --- a/tools/pd-api-bench/main.go +++ b/tools/pd-api-bench/main.go @@ -17,6 +17,7 @@ package main import ( "context" "crypto/tls" + "net/http" "os" "os/signal" "strconv" @@ -24,12 +25,18 @@ import ( "syscall" "time" + "github.com/gin-contrib/cors" + "github.com/gin-contrib/gzip" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" "github.com/pingcap/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" flag "github.com/spf13/pflag" pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/tlsutil" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-api-bench/cases" "github.com/tikv/pd/tools/pd-api-bench/config" @@ -39,18 +46,40 @@ import ( ) var ( - qps = flag.Int64("qps", 1000, "qps") - burst = flag.Int64("burst", 1, "burst") - - httpCases = flag.String("http-cases", "", "http api cases") - gRPCCases = flag.String("grpc-cases", "", "grpc cases") + qps, burst int64 + httpCases, gRPCCases string ) -var base = int64(time.Second) / int64(time.Microsecond) +var ( + pdAPIExecutionHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "api_bench", + Name: "pd_api_execution_duration_seconds", + Help: "Bucketed histogram of all pd api execution time (s)", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s + }, []string{"type"}) + + pdAPIRequestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "api_bench", + Name: "pd_api_request_total", + Help: "Counter of the pd http api requests", + }, []string{"type", "result"}) +) func main() { + prometheus.MustRegister(pdAPIExecutionHistogram) + prometheus.MustRegister(pdAPIRequestCounter) + + ctx, cancel := context.WithCancel(context.Background()) flagSet := flag.NewFlagSet("api-bench", flag.ContinueOnError) flagSet.ParseErrorsWhitelist.UnknownFlags = true + flagSet.Int64Var(&qps, "qps", 1, "qps") + flagSet.Int64Var(&burst, "burst", 1, "burst") + flagSet.StringVar(&httpCases, "http-cases", "", "http api cases") + flagSet.StringVar(&gRPCCases, "grpc-cases", "", "grpc cases") cfg := config.NewConfig(flagSet) err := cfg.Parse(os.Args[1:]) defer logutil.LogPanic() @@ -68,7 +97,6 @@ func main() { } else { log.Fatal("initialize logger error", zap.Error(err)) } - ctx, cancel := context.WithCancel(context.Background()) sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGHUP, @@ -82,75 +110,6 @@ func main() { cancel() }() - hcaseStr := strings.Split(*httpCases, ",") - for _, str := range hcaseStr { - caseQPS := int64(0) - caseBurst := int64(0) - cStr := "" - - strs := strings.Split(str, "-") - // to get case name - strsa := strings.Split(strs[0], "+") - cStr = strsa[0] - // to get case Burst - if len(strsa) > 1 { - caseBurst, err = strconv.ParseInt(strsa[1], 10, 64) - if err != nil { - log.Error("parse burst failed for case", zap.String("case", cStr), zap.String("config", strsa[1])) - } - } - // to get case qps - if len(strs) > 1 { - strsb := strings.Split(strs[1], "+") - caseQPS, err = strconv.ParseInt(strsb[0], 10, 64) - if err != nil { - if err != nil { - log.Error("parse qps failed for case", zap.String("case", cStr), zap.String("config", strsb[0])) - } - } - // to get case Burst - if len(strsb) > 1 { - caseBurst, err = strconv.ParseInt(strsb[1], 10, 64) - if err != nil { - log.Error("parse burst failed for case", zap.String("case", cStr), zap.String("config", strsb[1])) - } - } - } - if len(cStr) == 0 { - continue - } - if fn, ok := cases.HTTPCaseFnMap[cStr]; ok { - var cas cases.HTTPCase - if cas, ok = cases.HTTPCaseMap[cStr]; !ok { - cas = fn() - cases.HTTPCaseMap[cStr] = cas - } - if caseBurst > 0 { - cas.SetBurst(caseBurst) - } else if *burst > 0 { - cas.SetBurst(*burst) - } - if caseQPS > 0 { - cas.SetQPS(caseQPS) - } else if *qps > 0 { - cas.SetQPS(*qps) - } - } else { - log.Warn("HTTP case not implemented", zap.String("case", cStr)) - } - } - gcaseStr := strings.Split(*gRPCCases, ",") - // todo: see pull 7345 - for _, str := range gcaseStr { - if fn, ok := cases.GRPCCaseFnMap[str]; ok { - if _, ok = cases.GRPCCaseMap[str]; !ok { - cases.GRPCCaseMap[str] = fn() - } - } else { - log.Warn("gRPC case not implemented", zap.String("case", str)) - } - } - if cfg.Client == 0 { log.Error("concurrency == 0, exit") return @@ -158,23 +117,39 @@ func main() { pdClis := make([]pd.Client, cfg.Client) for i := int64(0); i < cfg.Client; i++ { pdClis[i] = newPDClient(ctx, cfg) + pdClis[i].UpdateOption(pd.EnableFollowerHandle, true) } httpClis := make([]pdHttp.Client, cfg.Client) for i := int64(0); i < cfg.Client; i++ { sd := pdClis[i].GetServiceDiscovery() - httpClis[i] = pdHttp.NewClientWithServiceDiscovery("tools-api-bench", sd, pdHttp.WithTLSConfig(loadTLSConfig(cfg))) + httpClis[i] = pdHttp.NewClientWithServiceDiscovery("tools-api-bench", sd, pdHttp.WithTLSConfig(loadTLSConfig(cfg)), pdHttp.WithMetrics(pdAPIRequestCounter, pdAPIExecutionHistogram)) } err = cases.InitCluster(ctx, pdClis[0], httpClis[0]) if err != nil { log.Fatal("InitCluster error", zap.Error(err)) } - for _, hcase := range cases.HTTPCaseMap { - handleHTTPCase(ctx, hcase, httpClis) + coordinator := cases.NewCoordinator(ctx, httpClis, pdClis) + + hcaseStr := strings.Split(httpCases, ",") + for _, str := range hcaseStr { + name, cfg := parseCaseNameAndConfig(str) + if len(name) == 0 { + continue + } + coordinator.SetHTTPCase(name, cfg) } - for _, gcase := range cases.GRPCCaseMap { - handleGRPCCase(ctx, gcase, pdClis) + gcaseStr := strings.Split(gRPCCases, ",") + for _, str := range gcaseStr { + name, cfg := parseCaseNameAndConfig(str) + if len(name) == 0 { + continue + } + coordinator.SetGRPCCase(name, cfg) } + cfg.InitCoordinator(coordinator) + + go runHTTPServer(cfg, coordinator) <-ctx.Done() for _, cli := range pdClis { @@ -192,64 +167,144 @@ func main() { } } -func handleGRPCCase(ctx context.Context, gcase cases.GRPCCase, clients []pd.Client) { - qps := gcase.GetQPS() - burst := gcase.GetBurst() - cliNum := int64(len(clients)) - tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond - log.Info("begin to run gRPC case", zap.String("case", gcase.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) - for _, cli := range clients { - go func(cli pd.Client) { - var ticker = time.NewTicker(tt) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for i := int64(0); i < burst; i++ { - err := gcase.Unary(ctx, cli) - if err != nil { - log.Error("meet erorr when doing gRPC request", zap.String("case", gcase.Name()), zap.Error(err)) - } - } - case <-ctx.Done(): - log.Info("Got signal to exit handleGetRegion") - return - } +func exit(code int) { + os.Exit(code) +} + +func parseCaseNameAndConfig(str string) (string, *cases.Config) { + var err error + cfg := &cases.Config{} + name := "" + strs := strings.Split(str, "-") + // to get case name + strsa := strings.Split(strs[0], "+") + name = strsa[0] + // to get case Burst + if len(strsa) > 1 { + cfg.Burst, err = strconv.ParseInt(strsa[1], 10, 64) + if err != nil { + log.Error("parse burst failed for case", zap.String("case", name), zap.String("config", strsa[1])) + } + } + // to get case qps + if len(strs) > 1 { + strsb := strings.Split(strs[1], "+") + cfg.QPS, err = strconv.ParseInt(strsb[0], 10, 64) + if err != nil { + if err != nil { + log.Error("parse qps failed for case", zap.String("case", name), zap.String("config", strsb[0])) + } + } + // to get case Burst + if len(strsb) > 1 { + cfg.Burst, err = strconv.ParseInt(strsb[1], 10, 64) + if err != nil { + log.Error("parse burst failed for case", zap.String("case", name), zap.String("config", strsb[1])) } - }(cli) + } + } + if cfg.QPS == 0 && qps > 0 { + cfg.QPS = qps + } + if cfg.Burst == 0 && burst > 0 { + cfg.Burst = burst } + return name, cfg } -func handleHTTPCase(ctx context.Context, hcase cases.HTTPCase, httpClis []pdHttp.Client) { - qps := hcase.GetQPS() - burst := hcase.GetBurst() - cliNum := int64(len(httpClis)) - tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond - log.Info("begin to run http case", zap.String("case", hcase.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) - for _, hCli := range httpClis { - go func(hCli pdHttp.Client) { - var ticker = time.NewTicker(tt) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for i := int64(0); i < burst; i++ { - err := hcase.Do(ctx, hCli) - if err != nil { - log.Error("meet erorr when doing HTTP request", zap.String("case", hcase.Name()), zap.Error(err)) - } - } - case <-ctx.Done(): - log.Info("Got signal to exit handleScanRegions") - return - } +func runHTTPServer(cfg *config.Config, co *cases.Coordinator) { + gin.SetMode(gin.ReleaseMode) + engine := gin.New() + engine.Use(gin.Recovery()) + engine.Use(cors.Default()) + engine.Use(gzip.Gzip(gzip.DefaultCompression)) + engine.GET("metrics", utils.PromHandler()) + // profile API + pprof.Register(engine) + + getCfg := func(c *gin.Context) *cases.Config { + var err error + cfg := &cases.Config{} + qpsStr := c.Query("qps") + if len(qpsStr) > 0 { + cfg.QPS, err = strconv.ParseInt(qpsStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + } + } + burstStr := c.Query("burst") + if len(burstStr) > 0 { + cfg.Burst, err = strconv.ParseInt(burstStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) } - }(hCli) + } + return cfg } -} -func exit(code int) { - os.Exit(code) + engine.POST("config/http/all", func(c *gin.Context) { + var input map[string]cases.Config + if err := c.ShouldBindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for name, cfg := range input { + co.SetHTTPCase(name, &cfg) + } + c.String(http.StatusOK, "") + }) + engine.POST("config/http/:name", func(c *gin.Context) { + name := c.Param("name") + cfg := getCfg(c) + co.SetHTTPCase(name, cfg) + c.String(http.StatusOK, "") + }) + engine.POST("config/grpc/all", func(c *gin.Context) { + var input map[string]cases.Config + if err := c.ShouldBindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for name, cfg := range input { + co.SetGRPCCase(name, &cfg) + } + c.String(http.StatusOK, "") + }) + engine.POST("config/grpc/:name", func(c *gin.Context) { + name := c.Param("name") + cfg := getCfg(c) + co.SetGRPCCase(name, cfg) + c.String(http.StatusOK, "") + }) + + engine.GET("config/http/all", func(c *gin.Context) { + all := co.GetAllHTTPCases() + c.IndentedJSON(http.StatusOK, all) + }) + engine.GET("config/http/:name", func(c *gin.Context) { + name := c.Param("name") + cfg, err := co.GetHTTPCase(name) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, cfg) + }) + engine.GET("config/grpc/all", func(c *gin.Context) { + all := co.GetAllGRPCCases() + c.IndentedJSON(http.StatusOK, all) + }) + engine.GET("config/grpc/:name", func(c *gin.Context) { + name := c.Param("name") + cfg, err := co.GetGRPCCase(name) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, cfg) + }) + // nolint + engine.Run(cfg.StatusAddr) } func trimHTTPPrefix(str string) string {