diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 55c9adf66d9..5ed9747e923 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -216,7 +216,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) { tsoServiceKey := discovery.TSOPath(clusterID) - tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) putFn := func(kv *mvccpb.KeyValue) error { s := &discovery.ServiceRegistryEntry{} @@ -249,7 +248,7 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithRange(tsoServiceEndKey), + true, /* withPrefix */ ) } diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 8c4ed015a5c..8b48fde611e 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -324,7 +324,8 @@ func getOperatorByRegion(c *gin.Context) { // @Tags operators // @Summary List operators. -// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting) +// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting) +// @Param object query bool false "Whether to return as JSON object." // @Produce json // @Success 200 {array} operator.Operator // @Failure 500 {string} string "PD server failed to proceed the request." @@ -337,6 +338,7 @@ func getOperators(c *gin.Context) { ) kinds := c.QueryArray("kind") + _, objectFlag := c.GetQuery("object") if len(kinds) == 0 { results, err = handler.GetOperators() } else { @@ -347,7 +349,15 @@ func getOperators(c *gin.Context) { c.String(http.StatusInternalServerError, err.Error()) return } - c.IndentedJSON(http.StatusOK, results) + if objectFlag { + objResults := make([]*operator.OpObject, len(results)) + for i, op := range results { + objResults[i] = op.ToJSONObject() + } + c.IndentedJSON(http.StatusOK, objResults) + } else { + c.IndentedJSON(http.StatusOK, results) + } } // @Tags operator diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 32028592504..8db5e656279 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -146,6 +146,7 @@ func (cw *Watcher) initializeConfigWatcher() error { func([]*clientv3.Event) error { return nil }, putFn, deleteFn, func([]*clientv3.Event) error { return nil }, + false, /* withPrefix */ ) cw.configWatcher.StartWatchLoop() return cw.configWatcher.WaitLoad() @@ -176,7 +177,7 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { func([]*clientv3.Event) error { return nil }, putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithPrefix(), + true, /* withPrefix */ ) cw.ttlConfigWatcher.StartWatchLoop() return cw.ttlConfigWatcher.WaitLoad() @@ -217,7 +218,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { func([]*clientv3.Event) error { return nil }, putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithPrefix(), + true, /* withPrefix */ ) cw.schedulerConfigWatcher.StartWatchLoop() return cw.schedulerConfigWatcher.WaitLoad() diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 808e8fc565e..925b28763b5 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -111,7 +111,7 @@ func (w *Watcher) initializeStoreWatcher() error { func([]*clientv3.Event) error { return nil }, putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithPrefix(), + true, /* withPrefix */ ) w.storeWatcher.StartWatchLoop() return w.storeWatcher.WaitLoad() diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 3e11cf9ff9d..c345ef037df 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -204,7 +204,7 @@ func (rw *Watcher) initializeRuleWatcher() error { preEventsFn, putFn, deleteFn, postEventsFn, - clientv3.WithPrefix(), + true, /* withPrefix */ ) rw.ruleWatcher.StartWatchLoop() return rw.ruleWatcher.WaitLoad() @@ -232,7 +232,7 @@ func (rw *Watcher) initializeRegionLabelWatcher() error { func([]*clientv3.Event) error { return nil }, putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithPrefix(), + true, /* withPrefix */ ) rw.labelWatcher.StartWatchLoop() return rw.labelWatcher.WaitLoad() diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index a8c54e824fb..b87a050969f 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -149,6 +149,39 @@ func (o *Operator) MarshalJSON() ([]byte, error) { return []byte(`"` + o.String() + `"`), nil } +// OpObject is used to return Operator as a json object for API. +type OpObject struct { + Desc string `json:"desc"` + Brief string `json:"brief"` + RegionID uint64 `json:"region_id"` + RegionEpoch *metapb.RegionEpoch `json:"region_epoch"` + Kind OpKind `json:"kind"` + Timeout string `json:"timeout"` + Status OpStatus `json:"status"` +} + +// ToJSONObject serializes Operator as JSON object. +func (o *Operator) ToJSONObject() *OpObject { + var status OpStatus + if o.CheckSuccess() { + status = SUCCESS + } else if o.CheckTimeout() { + status = TIMEOUT + } else { + status = o.Status() + } + + return &OpObject{ + Desc: o.desc, + Brief: o.brief, + RegionID: o.regionID, + RegionEpoch: o.regionEpoch, + Kind: o.kind, + Timeout: o.timeout.String(), + Status: status, + } +} + // Desc returns the operator's short description. func (o *Operator) Desc() string { return o.desc diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 1b0ff8385bf..4719df9408b 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -541,3 +541,36 @@ func (suite *operatorTestSuite) TestRecord() { re.Equal(now, ob.FinishTime) re.Greater(ob.duration.Seconds(), time.Second.Seconds()) } + +func (suite *operatorTestSuite) TestToJSONObject() { + steps := []OpStep{ + AddPeer{ToStore: 1, PeerID: 1}, + TransferLeader{FromStore: 3, ToStore: 1}, + RemovePeer{FromStore: 3}, + } + op := suite.newTestOperator(101, OpLeader|OpRegion, steps...) + op.Start() + obj := op.ToJSONObject() + suite.Equal("test", obj.Desc) + suite.Equal("test", obj.Brief) + suite.Equal(uint64(101), obj.RegionID) + suite.Equal(OpLeader|OpRegion, obj.Kind) + suite.Equal("12m0s", obj.Timeout) + suite.Equal(STARTED, obj.Status) + + // Test SUCCESS status. + region := suite.newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2}) + suite.Nil(op.Check(region)) + suite.Equal(SUCCESS, op.Status()) + obj = op.ToJSONObject() + suite.Equal(SUCCESS, obj.Status) + + // Test TIMEOUT status. + steps = []OpStep{TransferLeader{FromStore: 2, ToStore: 1}} + op = suite.newTestOperator(1, OpLeader, steps...) + op.Start() + op.SetStatusReachTime(STARTED, op.GetStartTime().Add(-FastStepWaitTime-time.Second)) + suite.True(op.CheckTimeout()) + obj = op.ToJSONObject() + suite.Equal(TIMEOUT, obj.Status) +} diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 7843249229b..126e3bba41d 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -60,6 +60,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti continue } storeID := store.GetID() + // It is used to avoid sudden scheduling when scheduling service is just started. + if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { + return false + } // For each store, the number of active regions should be more than total region of the store * collectFactor if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { return false diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index c48c066a2aa..35bb7681fd0 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -22,7 +22,6 @@ import ( "net/http" "regexp" "sort" - "strings" "sync" "time" @@ -485,8 +484,6 @@ func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig { // Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} // Value: discover.ServiceRegistryEntry func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { - tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/" - putFn := func(kv *mvccpb.KeyValue) error { s := &discovery.ServiceRegistryEntry{} if err := json.Unmarshal(kv.Value, s); err != nil { @@ -518,7 +515,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { putFn, deleteFn, func([]*clientv3.Event) error { return nil }, - clientv3.WithRange(tsoServiceEndKey), + true, /* withPrefix */ ) kgm.tsoNodesWatcher.StartWatchLoop() if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil { @@ -535,9 +532,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { // Value: endpoint.KeyspaceGroup func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { rootPath := kgm.legacySvcRootPath - startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/") - endKey := strings.Join( - []string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/") + startKey := rootPath + "/" + endpoint.KeyspaceGroupIDPrefix() defaultKGConfigured := false putFn := func(kv *mvccpb.KeyValue) error { @@ -577,7 +572,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { putFn, deleteFn, postEventsFn, - clientv3.WithRange(endKey), + true, /* withPrefix */ ) if kgm.loadKeyspaceGroupsTimeout > 0 { kgm.groupWatcher.SetLoadTimeout(kgm.loadKeyspaceGroupsTimeout) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 11c640fe4ef..798e8c1ad15 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -558,12 +558,10 @@ func InitOrGetClusterID(c *clientv3.Client, key string) (uint64, error) { } const ( - defaultLoadDataFromEtcdTimeout = 30 * time.Second - defaultLoadFromEtcdRetryInterval = 200 * time.Millisecond - defaultLoadFromEtcdRetryTimes = int(defaultLoadDataFromEtcdTimeout / defaultLoadFromEtcdRetryInterval) - defaultLoadBatchSize = 400 - defaultWatchChangeRetryInterval = 1 * time.Second - defaultForceLoadMinimalInterval = 200 * time.Millisecond + defaultLoadDataFromEtcdTimeout = 5 * time.Minute + defaultEtcdRetryInterval = time.Second + defaultLoadFromEtcdRetryTimes = 3 + defaultLoadBatchSize = 400 // RequestProgressInterval is the interval to call RequestProgress for watcher. RequestProgressInterval = 1 * time.Second @@ -580,8 +578,8 @@ type LoopWatcher struct { // key is the etcd key to watch. key string - // opts is used to set etcd options. - opts []clientv3.OpOption + // isWithPrefix indicates whether the watcher is with prefix. + isWithPrefix bool // forceLoadCh is used to force loading data from etcd. forceLoadCh chan struct{} @@ -623,7 +621,7 @@ func NewLoopWatcher( preEventsFn func([]*clientv3.Event) error, putFn, deleteFn func(*mvccpb.KeyValue) error, postEventsFn func([]*clientv3.Event) error, - opts ...clientv3.OpOption, + isWithPrefix bool, ) *LoopWatcher { return &LoopWatcher{ ctx: ctx, @@ -638,12 +636,12 @@ func NewLoopWatcher( deleteFn: deleteFn, postEventsFn: postEventsFn, preEventsFn: preEventsFn, - opts: opts, + isWithPrefix: isWithPrefix, lastTimeForceLoad: time.Now(), loadTimeout: defaultLoadDataFromEtcdTimeout, loadRetryTimes: defaultLoadFromEtcdRetryTimes, loadBatchSize: defaultLoadBatchSize, - watchChangeRetryInterval: defaultWatchChangeRetryInterval, + watchChangeRetryInterval: defaultEtcdRetryInterval, } } @@ -689,11 +687,8 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 { watchStartRevision int64 err error ) - ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval) + ticker := time.NewTicker(defaultEtcdRetryInterval) defer ticker.Stop() - ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout) - defer cancel() - for i := 0; i < lw.loadRetryTimes; i++ { failpoint.Inject("loadTemporaryFail", func(val failpoint.Value) { if maxFailTimes, ok := val.(int); ok && i < maxFailTimes { @@ -701,11 +696,6 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 { failpoint.Continue() } }) - failpoint.Inject("delayLoad", func(val failpoint.Value) { - if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 { - time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second) - } - }) watchStartRevision, err = lw.load(ctx) if err == nil { break @@ -754,7 +744,10 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision // make sure to wrap context with "WithRequireLeader". watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx)) watcherCancel = cancel - opts := append(lw.opts, clientv3.WithRev(revision), clientv3.WithProgressNotify()) + opts := []clientv3.OpOption{clientv3.WithRev(revision), clientv3.WithProgressNotify()} + if lw.isWithPrefix { + opts = append(opts, clientv3.WithPrefix()) + } done := make(chan struct{}) go grpcutil.CheckStream(watcherCtx, watcherCancel, done) watchChan := watcher.Watch(watcherCtx, lw.key, opts...) @@ -864,8 +857,14 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision } func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) { - ctx, cancel := context.WithTimeout(ctx, DefaultRequestTimeout) + ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout) defer cancel() + failpoint.Inject("delayLoad", func(val failpoint.Value) { + if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 { + time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second) + } + }) + startKey := lw.key // If limit is 0, it means no limit. // If limit is not 0, we need to add 1 to limit to get the next key. @@ -883,10 +882,22 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) zap.String("key", lw.key), zap.Error(err)) } }() + + // In most cases, 'Get(foo, WithPrefix())' is equivalent to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'. + // However, when the startKey changes, the two are no longer equivalent. + // For example, the end key for 'WithRange(GetPrefixRangeEnd(foo))' is consistently 'fop'. + // But when using 'Get(foo1, WithPrefix())', the end key becomes 'foo2', not 'fop'. + // So, we use 'WithRange()' to avoid this problem. + opts := []clientv3.OpOption{ + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithLimit(limit)} + if lw.isWithPrefix { + opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey))) + } + for { // Sort by key to get the next key and we don't need to worry about the performance, // Because the default sort is just SortByKey and SortAscend - opts := append(lw.opts, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), clientv3.WithLimit(limit)) resp, err := clientv3.NewKV(lw.client).Get(ctx, startKey, opts...) if err != nil { log.Error("load failed in watch loop", zap.String("name", lw.name), @@ -923,14 +934,14 @@ func (lw *LoopWatcher) ForceLoad() { // Two-phase locking is also used to let most of the requests return directly without acquiring // the write lock and causing the system to choke. lw.forceLoadMu.RLock() - if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval { + if time.Since(lw.lastTimeForceLoad) < defaultEtcdRetryInterval { lw.forceLoadMu.RUnlock() return } lw.forceLoadMu.RUnlock() lw.forceLoadMu.Lock() - if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval { + if time.Since(lw.lastTimeForceLoad) < defaultEtcdRetryInterval { lw.forceLoadMu.Unlock() return } diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index d415d2d1873..853e4e7aab9 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -399,12 +399,7 @@ func (suite *loopWatcherTestSuite) TearDownSuite() { func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { re := suite.Require() - cache := struct { - syncutil.RWMutex - data map[string]struct{} - }{ - data: make(map[string]struct{}), - } + cache := make(map[string]struct{}) watcher := NewLoopWatcher( suite.ctx, &suite.wg, @@ -413,20 +408,17 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { "TestLoadWithoutKey", func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { - cache.Lock() - defer cache.Unlock() - cache.data[string(kv.Key)] = struct{}{} + cache[string(kv.Key)] = struct{}{} return nil }, func(kv *mvccpb.KeyValue) error { return nil }, func([]*clientv3.Event) error { return nil }, + false, /* withPrefix */ ) watcher.StartWatchLoop() err := watcher.WaitLoad() re.NoError(err) // although no key, watcher returns no error - cache.RLock() - defer cache.RUnlock() - re.Empty(cache.data) + re.Empty(cache) } func (suite *loopWatcherTestSuite) TestCallBack() { @@ -464,7 +456,7 @@ func (suite *loopWatcherTestSuite) TestCallBack() { result = result[:0] return nil }, - clientv3.WithPrefix(), + true, /* withPrefix */ ) watcher.StartWatchLoop() err := watcher.WaitLoad() @@ -499,12 +491,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { for i := 0; i < count; i++ { suite.put(re, fmt.Sprintf("TestWatcherLoadLimit%d", i), "") } - cache := struct { - syncutil.RWMutex - data []string - }{ - data: make([]string, 0), - } + cache := make([]string, 0) watcher := NewLoopWatcher( ctx, &suite.wg, @@ -513,9 +500,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { "TestWatcherLoadLimit", func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { - cache.Lock() - defer cache.Unlock() - cache.data = append(cache.data, string(kv.Key)) + cache = append(cache, string(kv.Key)) return nil }, func(kv *mvccpb.KeyValue) error { @@ -524,19 +509,53 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { func([]*clientv3.Event) error { return nil }, - clientv3.WithPrefix(), + true, /* withPrefix */ ) + watcher.SetLoadBatchSize(int64(limit)) watcher.StartWatchLoop() err := watcher.WaitLoad() re.NoError(err) - cache.RLock() - re.Len(cache.data, count) - cache.RUnlock() + re.Len(cache, count) cancel() } } } +func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() { + re := suite.Require() + // use default limit to test 16384 key in etcd + count := 16384 + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + for i := 0; i < count; i++ { + suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + } + cache := make([]string, 0) + watcher := NewLoopWatcher( + ctx, + &suite.wg, + suite.client, + "test", + "TestWatcherLoadLargeKey", + func([]*clientv3.Event) error { return nil }, + func(kv *mvccpb.KeyValue) error { + cache = append(cache, string(kv.Key)) + return nil + }, + func(kv *mvccpb.KeyValue) error { + return nil + }, + func([]*clientv3.Event) error { + return nil + }, + true, /* withPrefix */ + ) + watcher.StartWatchLoop() + err := watcher.WaitLoad() + re.NoError(err) + re.Len(cache, count) +} + func (suite *loopWatcherTestSuite) TestWatcherBreak() { re := suite.Require() cache := struct { @@ -568,6 +587,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { }, func(kv *mvccpb.KeyValue) error { return nil }, func([]*clientv3.Event) error { return nil }, + false, /* withPrefix */ ) watcher.watchChangeRetryInterval = 100 * time.Millisecond watcher.StartWatchLoop() @@ -646,6 +666,7 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { func(kv *mvccpb.KeyValue) error { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, func([]*clientv3.Event) error { return nil }, + false, /* withPrefix */ ) suite.wg.Add(1) diff --git a/server/api/operator.go b/server/api/operator.go index 7ff7d2d7c51..049a343d3bd 100644 --- a/server/api/operator.go +++ b/server/api/operator.go @@ -66,7 +66,8 @@ func (h *operatorHandler) GetOperatorsByRegion(w http.ResponseWriter, r *http.Re // @Tags operator // @Summary List pending operators. -// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region) +// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region) +// @Param object query bool false "Whether to return as JSON object." // @Produce json // @Success 200 {array} operator.Operator // @Failure 500 {string} string "PD server failed to proceed the request." @@ -78,6 +79,7 @@ func (h *operatorHandler) GetOperators(w http.ResponseWriter, r *http.Request) { ) kinds, ok := r.URL.Query()["kind"] + _, objectFlag := r.URL.Query()["object"] if !ok { results, err = h.Handler.GetOperators() } else { @@ -88,7 +90,15 @@ func (h *operatorHandler) GetOperators(w http.ResponseWriter, r *http.Request) { h.r.JSON(w, http.StatusInternalServerError, err.Error()) return } - h.r.JSON(w, http.StatusOK, results) + if objectFlag { + objResults := make([]*operator.OpObject, len(results)) + for i, op := range results { + objResults[i] = op.ToJSONObject() + } + h.r.JSON(w, http.StatusOK, objResults) + } else { + h.r.JSON(w, http.StatusOK, results) + } } // FIXME: details of input json body params diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 11d912a5f54..09b935c2f84 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -116,7 +116,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques putFn, deleteFn, postEventsFn, - clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey)), + true, /* withPrefix */ ) watcher.StartWatchLoop() if err := watcher.WaitLoad(); err != nil { diff --git a/server/server.go b/server/server.go index e45be6b232f..5d930baae2b 100644 --- a/server/server.go +++ b/server/server.go @@ -2056,6 +2056,7 @@ func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string putFn, deleteFn, func([]*clientv3.Event) error { return nil }, + false, /* withPrefix */ ) } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index b83703c377f..cd3f2ac34dc 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "net/http" + "sort" "strconv" "strings" "testing" @@ -509,6 +510,108 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te } } +func (suite *operatorTestSuite) TestGetOperatorsAsObject() { + // use a new environment to avoid being affected by other tests + env := tests.NewSchedulingTestEnvironment(suite.T(), + func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + env.RunTestInTwoModes(suite.checkGetOperatorsAsObject) + env.Cleanup() +} + +func (suite *operatorTestSuite) checkGetOperatorsAsObject(cluster *tests.TestCluster) { + re := suite.Require() + suite.pauseRuleChecker(re, cluster) + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + objURL := fmt.Sprintf("%s/operators?object=1", urlPrefix) + resp := make([]operator.OpObject, 0) + + // No operator. + err := tu.ReadGetJSON(re, testDialClient, objURL, &resp) + re.NoError(err) + re.Empty(resp) + + // Merge operator. + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte("d"), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) + tests.MustPutRegionInfo(re, cluster, r3) + + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re)) + re.NoError(err) + err = tu.ReadGetJSON(re, testDialClient, objURL, &resp) + re.NoError(err) + re.Len(resp, 2) + less := func(i, j int) bool { + return resp[i].RegionID < resp[j].RegionID + } + sort.Slice(resp, less) + re.Equal(uint64(10), resp[0].RegionID) + re.Equal("admin-merge-region", resp[0].Desc) + re.Equal("merge: region 10 to 20", resp[0].Brief) + re.Equal("10m0s", resp[0].Timeout) + re.Equal(&metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, resp[0].RegionEpoch) + re.Equal(operator.OpAdmin|operator.OpMerge, resp[0].Kind) + re.Truef(resp[0].Status == operator.CREATED || resp[0].Status == operator.STARTED, "unexpected status %s", resp[0].Status) + re.Equal(uint64(20), resp[1].RegionID) + re.Equal("admin-merge-region", resp[1].Desc) + + // Add peer operator. + peer1 := &metapb.Peer{Id: 100, StoreId: 1} + peer2 := &metapb.Peer{Id: 200, StoreId: 2} + region := &metapb.Region{ + Id: 40, + Peers: []*metapb.Peer{peer1, peer2}, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + StartKey: []byte("d"), + EndKey: []byte(""), + } + regionInfo := core.NewRegionInfo(region, peer1) + tests.MustPutRegionInfo(re, cluster, regionInfo) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"add-peer", "region_id": 40, "store_id": 3}`), tu.StatusOK(re)) + re.NoError(err) + err = tu.ReadGetJSON(re, testDialClient, objURL, &resp) + re.NoError(err) + re.Len(resp, 3) + sort.Slice(resp, less) + re.Equal(uint64(40), resp[2].RegionID) + re.Equal("admin-add-peer", resp[2].Desc) +} + // pauseRuleChecker will pause rule checker to avoid unexpected operator. func (suite *operatorTestSuite) pauseRuleChecker(re *require.Assertions, cluster *tests.TestCluster) { checkerName := "rule" diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index 79d2ce873c5..59ea6337115 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -54,6 +54,24 @@ func InitCluster(ctx context.Context, cli pd.Client, httpCli pdHttp.Client) erro return nil } +// Config is the configuration for the case. +type Config struct { + QPS int64 `toml:"qps" json:"qps"` + Burst int64 `toml:"burst" json:"burst"` +} + +func newConfig() *Config { + return &Config{ + Burst: 1, + } +} + +// Clone returns a cloned configuration. +func (c *Config) Clone() *Config { + cfg := *c + return &cfg +} + // Case is the interface for all cases. type Case interface { Name() string @@ -61,12 +79,12 @@ type Case interface { GetQPS() int64 SetBurst(int64) GetBurst() int64 + GetConfig() *Config } type baseCase struct { - name string - qps int64 - burst int64 + name string + cfg *Config } func (c *baseCase) Name() string { @@ -74,19 +92,23 @@ func (c *baseCase) Name() string { } func (c *baseCase) SetQPS(qps int64) { - c.qps = qps + c.cfg.QPS = qps } func (c *baseCase) GetQPS() int64 { - return c.qps + return c.cfg.QPS } func (c *baseCase) SetBurst(burst int64) { - c.burst = burst + c.cfg.Burst = burst } func (c *baseCase) GetBurst() int64 { - return c.burst + return c.cfg.Burst +} + +func (c *baseCase) GetConfig() *Config { + return c.cfg.Clone() } // GRPCCase is the interface for all gRPC cases. @@ -100,11 +122,12 @@ type GRPCCraeteFn func() GRPCCase // GRPCCaseFnMap is the map for all gRPC case creation function. var GRPCCaseFnMap = map[string]GRPCCraeteFn{ - "GetRegion": newGetRegion(), - "GetStore": newGetStore(), - "GetStores": newGetStores(), - "ScanRegions": newScanRegions(), - "Tso": newTso(), + "GetRegion": newGetRegion(), + "GetRegionEnableFollower": newGetRegionEnableFollower(), + "GetStore": newGetStore(), + "GetStores": newGetStores(), + "ScanRegions": newScanRegions(), + "Tso": newTso(), } // GRPCCaseMap is the map for all gRPC case creation function. @@ -136,9 +159,8 @@ func newMinResolvedTS() func() HTTPCase { return func() HTTPCase { return &minResolvedTS{ baseCase: &baseCase{ - name: "GetMinResolvedTS", - qps: 1000, - burst: 1, + name: "GetMinResolvedTS", + cfg: newConfig(), }, } } @@ -164,9 +186,8 @@ func newRegionStats() func() HTTPCase { return func() HTTPCase { return ®ionsStats{ baseCase: &baseCase{ - name: "GetRegionStatus", - qps: 100, - burst: 1, + name: "GetRegionStatus", + cfg: newConfig(), }, regionSample: 1000, } @@ -200,9 +221,8 @@ func newGetRegion() func() GRPCCase { return func() GRPCCase { return &getRegion{ baseCase: &baseCase{ - name: "GetRegion", - qps: 10000, - burst: 1, + name: "GetRegion", + cfg: newConfig(), }, } } @@ -217,6 +237,30 @@ func (c *getRegion) Unary(ctx context.Context, cli pd.Client) error { return nil } +type getRegionEnableFollower struct { + *baseCase +} + +func newGetRegionEnableFollower() func() GRPCCase { + return func() GRPCCase { + return &getRegionEnableFollower{ + baseCase: &baseCase{ + name: "GetRegionEnableFollower", + cfg: newConfig(), + }, + } + } +} + +func (c *getRegionEnableFollower) Unary(ctx context.Context, cli pd.Client) error { + id := rand.Intn(totalRegion)*4 + 1 + _, err := cli.GetRegion(ctx, generateKeyForSimulator(id, 56), pd.WithAllowFollowerHandle()) + if err != nil { + return err + } + return nil +} + type scanRegions struct { *baseCase regionSample int @@ -226,9 +270,8 @@ func newScanRegions() func() GRPCCase { return func() GRPCCase { return &scanRegions{ baseCase: &baseCase{ - name: "ScanRegions", - qps: 10000, - burst: 1, + name: "ScanRegions", + cfg: newConfig(), }, regionSample: 10000, } @@ -255,9 +298,8 @@ func newTso() func() GRPCCase { return func() GRPCCase { return &tso{ baseCase: &baseCase{ - name: "Tso", - qps: 10000, - burst: 1, + name: "Tso", + cfg: newConfig(), }, } } @@ -279,9 +321,8 @@ func newGetStore() func() GRPCCase { return func() GRPCCase { return &getStore{ baseCase: &baseCase{ - name: "GetStore", - qps: 10000, - burst: 1, + name: "GetStore", + cfg: newConfig(), }, } } @@ -304,9 +345,8 @@ func newGetStores() func() GRPCCase { return func() GRPCCase { return &getStores{ baseCase: &baseCase{ - name: "GetStores", - qps: 10000, - burst: 1, + name: "GetStores", + cfg: newConfig(), }, } } diff --git a/tools/pd-api-bench/cases/controller.go b/tools/pd-api-bench/cases/controller.go new file mode 100644 index 00000000000..2a4561a3d2a --- /dev/null +++ b/tools/pd-api-bench/cases/controller.go @@ -0,0 +1,268 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + pd "github.com/tikv/pd/client" + pdHttp "github.com/tikv/pd/client/http" + "go.uber.org/zap" +) + +var base = int64(time.Second) / int64(time.Microsecond) + +// Coordinator managers the operation of the gRPC and HTTP case. +type Coordinator struct { + ctx context.Context + + httpClients []pdHttp.Client + gRPCClients []pd.Client + + http map[string]*httpController + grpc map[string]*gRPCController + + mu sync.RWMutex +} + +// NewCoordinator returns a new coordinator. +func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client) *Coordinator { + return &Coordinator{ + ctx: ctx, + httpClients: httpClients, + gRPCClients: gRPCClients, + http: make(map[string]*httpController), + grpc: make(map[string]*gRPCController), + } +} + +// GetHTTPCase returns the HTTP case config. +func (c *Coordinator) GetHTTPCase(name string) (*Config, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if controller, ok := c.http[name]; ok { + return controller.GetConfig(), nil + } + return nil, errors.Errorf("case %v does not exist.", name) +} + +// GetGRPCCase returns the gRPC case config. +func (c *Coordinator) GetGRPCCase(name string) (*Config, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if controller, ok := c.grpc[name]; ok { + return controller.GetConfig(), nil + } + return nil, errors.Errorf("case %v does not exist.", name) +} + +// GetAllHTTPCases returns the all HTTP case configs. +func (c *Coordinator) GetAllHTTPCases() map[string]*Config { + c.mu.RLock() + defer c.mu.RUnlock() + ret := make(map[string]*Config) + for name, c := range c.http { + ret[name] = c.GetConfig() + } + return ret +} + +// GetAllGRPCCases returns the all gRPC case configs. +func (c *Coordinator) GetAllGRPCCases() map[string]*Config { + c.mu.RLock() + defer c.mu.RUnlock() + ret := make(map[string]*Config) + for name, c := range c.grpc { + ret[name] = c.GetConfig() + } + return ret +} + +// SetHTTPCase sets the config for the specific case. +func (c *Coordinator) SetHTTPCase(name string, cfg *Config) error { + c.mu.Lock() + defer c.mu.Unlock() + if fn, ok := HTTPCaseFnMap[name]; ok { + var controller *httpController + if controller, ok = c.http[name]; !ok { + controller = newHTTPController(c.ctx, c.httpClients, fn) + c.http[name] = controller + } + controller.stop() + controller.SetQPS(cfg.QPS) + if cfg.Burst > 0 { + controller.SetBurst(cfg.Burst) + } + controller.run() + } else { + return errors.Errorf("HTTP case %s not implemented", name) + } + return nil +} + +// SetGRPCCase sets the config for the specific case. +func (c *Coordinator) SetGRPCCase(name string, cfg *Config) error { + c.mu.Lock() + defer c.mu.Unlock() + if fn, ok := GRPCCaseFnMap[name]; ok { + var controller *gRPCController + if controller, ok = c.grpc[name]; !ok { + controller = newGRPCController(c.ctx, c.gRPCClients, fn) + c.grpc[name] = controller + } + controller.stop() + controller.SetQPS(cfg.QPS) + if cfg.Burst > 0 { + controller.SetBurst(cfg.Burst) + } + controller.run() + } else { + return errors.Errorf("HTTP case %s not implemented", name) + } + return nil +} + +type httpController struct { + HTTPCase + clients []pdHttp.Client + pctx context.Context + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func newHTTPController(ctx context.Context, clis []pdHttp.Client, fn HTTPCraeteFn) *httpController { + c := &httpController{ + pctx: ctx, + clients: clis, + HTTPCase: fn(), + } + return c +} + +// run tries to run the HTTP api bench. +func (c *httpController) run() { + if c.GetQPS() <= 0 || c.cancel != nil { + return + } + c.ctx, c.cancel = context.WithCancel(c.pctx) + qps := c.GetQPS() + burst := c.GetBurst() + cliNum := int64(len(c.clients)) + tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond + log.Info("begin to run http case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + for _, hCli := range c.clients { + c.wg.Add(1) + go func(hCli pdHttp.Client) { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + for i := int64(0); i < burst; i++ { + err := c.Do(c.ctx, hCli) + if err != nil { + log.Error("meet erorr when doing HTTP request", zap.String("case", c.Name()), zap.Error(err)) + } + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running HTTP case") + return + } + } + }(hCli) + } +} + +// stop stops the HTTP api bench. +func (c *httpController) stop() { + if c.cancel == nil { + return + } + c.cancel() + c.cancel = nil + c.wg.Wait() +} + +type gRPCController struct { + GRPCCase + clients []pd.Client + pctx context.Context + + ctx context.Context + cancel context.CancelFunc + + wg sync.WaitGroup +} + +func newGRPCController(ctx context.Context, clis []pd.Client, fn GRPCCraeteFn) *gRPCController { + c := &gRPCController{ + pctx: ctx, + clients: clis, + GRPCCase: fn(), + } + return c +} + +// run tries to run the gRPC api bench. +func (c *gRPCController) run() { + if c.GetQPS() <= 0 || c.cancel != nil { + return + } + c.ctx, c.cancel = context.WithCancel(c.pctx) + qps := c.GetQPS() + burst := c.GetBurst() + cliNum := int64(len(c.clients)) + tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond + log.Info("begin to run gRPC case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + for _, cli := range c.clients { + c.wg.Add(1) + go func(cli pd.Client) { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + for i := int64(0); i < burst; i++ { + err := c.Unary(c.ctx, cli) + if err != nil { + log.Error("meet erorr when doing gRPC request", zap.String("case", c.Name()), zap.Error(err)) + } + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running gRPC case") + return + } + } + }(cli) + } +} + +// stop stops the gRPC api bench. +func (c *gRPCController) stop() { + if c.cancel == nil { + return + } + c.cancel() + c.cancel = nil + c.wg.Wait() +} diff --git a/tools/pd-api-bench/config/config.go b/tools/pd-api-bench/config/config.go index 783a7ee463a..8898c0e3083 100644 --- a/tools/pd-api-bench/config/config.go +++ b/tools/pd-api-bench/config/config.go @@ -29,6 +29,7 @@ type Config struct { flagSet *flag.FlagSet configFile string PDAddr string `toml:"pd" json:"pd"` + StatusAddr string `toml:"status" json:"status"` Log log.Config `toml:"log" json:"log"` Logger *zap.Logger @@ -42,13 +43,8 @@ type Config struct { KeyPath string `toml:"key-path" json:"key-path"` // only for init - HTTP map[string]caseConfig `toml:"http" json:"http"` - GRPC map[string]caseConfig `toml:"grpc" json:"grpc"` -} - -type caseConfig struct { - QPS int64 `toml:"qps" json:"qps"` - Burst int64 `toml:"burst" json:"burst"` + HTTP map[string]cases.Config `toml:"http" json:"http"` + GRPC map[string]cases.Config `toml:"grpc" json:"grpc"` } // NewConfig return a set of settings. @@ -58,6 +54,8 @@ func NewConfig(flagSet *flag.FlagSet) *Config { fs := cfg.flagSet fs.StringVar(&cfg.configFile, "config", "", "config file") fs.StringVar(&cfg.PDAddr, "pd", "http://127.0.0.1:2379", "pd address") + fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path") + fs.StringVar(&cfg.StatusAddr, "status", "127.0.0.1:10081", "status address") fs.Int64Var(&cfg.Client, "client", 1, "client number") fs.StringVar(&cfg.CaPath, "cacert", "", "path of file that contains list of trusted SSL CAs") fs.StringVar(&cfg.CertPath, "cert", "", "path of file that contains X509 certificate in PEM format") @@ -93,43 +91,23 @@ func (c *Config) Parse(arguments []string) error { return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0)) } + return nil +} + +// InitCoordinator set case config from config itself. +func (c *Config) InitCoordinator(co *cases.Coordinator) { for name, cfg := range c.HTTP { - if fn, ok := cases.HTTPCaseFnMap[name]; ok { - var cas cases.HTTPCase - if cas, ok = cases.HTTPCaseMap[name]; !ok { - cas = fn() - cases.HTTPCaseMap[name] = cas - } - if cfg.QPS > 0 { - cas.SetQPS(cfg.QPS) - } - if cfg.Burst > 0 { - cas.SetBurst(cfg.Burst) - } - } else { - log.Warn("HTTP case not implemented", zap.String("case", name)) + err := co.SetHTTPCase(name, &cfg) + if err != nil { + log.Error("create HTTP case failed", zap.Error(err)) } } - for name, cfg := range c.GRPC { - if fn, ok := cases.GRPCCaseFnMap[name]; ok { - var cas cases.GRPCCase - if cas, ok = cases.GRPCCaseMap[name]; !ok { - cas = fn() - cases.GRPCCaseMap[name] = cas - } - if cfg.QPS > 0 { - cas.SetQPS(cfg.QPS) - } - if cfg.Burst > 0 { - cas.SetBurst(cfg.Burst) - } - } else { - log.Warn("gRPC case not implemented", zap.String("case", name)) + err := co.SetGRPCCase(name, &cfg) + if err != nil { + log.Error("create gRPC case failed", zap.Error(err)) } } - - return nil } // Adjust is used to adjust configurations diff --git a/tools/pd-api-bench/main.go b/tools/pd-api-bench/main.go index 56e7ee761b2..681c3579012 100644 --- a/tools/pd-api-bench/main.go +++ b/tools/pd-api-bench/main.go @@ -17,6 +17,7 @@ package main import ( "context" "crypto/tls" + "net/http" "os" "os/signal" "strconv" @@ -24,12 +25,18 @@ import ( "syscall" "time" + "github.com/gin-contrib/cors" + "github.com/gin-contrib/gzip" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" "github.com/pingcap/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" flag "github.com/spf13/pflag" pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/tlsutil" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-api-bench/cases" "github.com/tikv/pd/tools/pd-api-bench/config" @@ -39,18 +46,40 @@ import ( ) var ( - qps = flag.Int64("qps", 1000, "qps") - burst = flag.Int64("burst", 1, "burst") - - httpCases = flag.String("http-cases", "", "http api cases") - gRPCCases = flag.String("grpc-cases", "", "grpc cases") + qps, burst int64 + httpCases, gRPCCases string ) -var base = int64(time.Second) / int64(time.Microsecond) +var ( + pdAPIExecutionHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "api_bench", + Name: "pd_api_execution_duration_seconds", + Help: "Bucketed histogram of all pd api execution time (s)", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s + }, []string{"type"}) + + pdAPIRequestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "api_bench", + Name: "pd_api_request_total", + Help: "Counter of the pd http api requests", + }, []string{"type", "result"}) +) func main() { + prometheus.MustRegister(pdAPIExecutionHistogram) + prometheus.MustRegister(pdAPIRequestCounter) + + ctx, cancel := context.WithCancel(context.Background()) flagSet := flag.NewFlagSet("api-bench", flag.ContinueOnError) flagSet.ParseErrorsWhitelist.UnknownFlags = true + flagSet.Int64Var(&qps, "qps", 1, "qps") + flagSet.Int64Var(&burst, "burst", 1, "burst") + flagSet.StringVar(&httpCases, "http-cases", "", "http api cases") + flagSet.StringVar(&gRPCCases, "grpc-cases", "", "grpc cases") cfg := config.NewConfig(flagSet) err := cfg.Parse(os.Args[1:]) defer logutil.LogPanic() @@ -68,7 +97,6 @@ func main() { } else { log.Fatal("initialize logger error", zap.Error(err)) } - ctx, cancel := context.WithCancel(context.Background()) sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGHUP, @@ -82,75 +110,6 @@ func main() { cancel() }() - hcaseStr := strings.Split(*httpCases, ",") - for _, str := range hcaseStr { - caseQPS := int64(0) - caseBurst := int64(0) - cStr := "" - - strs := strings.Split(str, "-") - // to get case name - strsa := strings.Split(strs[0], "+") - cStr = strsa[0] - // to get case Burst - if len(strsa) > 1 { - caseBurst, err = strconv.ParseInt(strsa[1], 10, 64) - if err != nil { - log.Error("parse burst failed for case", zap.String("case", cStr), zap.String("config", strsa[1])) - } - } - // to get case qps - if len(strs) > 1 { - strsb := strings.Split(strs[1], "+") - caseQPS, err = strconv.ParseInt(strsb[0], 10, 64) - if err != nil { - if err != nil { - log.Error("parse qps failed for case", zap.String("case", cStr), zap.String("config", strsb[0])) - } - } - // to get case Burst - if len(strsb) > 1 { - caseBurst, err = strconv.ParseInt(strsb[1], 10, 64) - if err != nil { - log.Error("parse burst failed for case", zap.String("case", cStr), zap.String("config", strsb[1])) - } - } - } - if len(cStr) == 0 { - continue - } - if fn, ok := cases.HTTPCaseFnMap[cStr]; ok { - var cas cases.HTTPCase - if cas, ok = cases.HTTPCaseMap[cStr]; !ok { - cas = fn() - cases.HTTPCaseMap[cStr] = cas - } - if caseBurst > 0 { - cas.SetBurst(caseBurst) - } else if *burst > 0 { - cas.SetBurst(*burst) - } - if caseQPS > 0 { - cas.SetQPS(caseQPS) - } else if *qps > 0 { - cas.SetQPS(*qps) - } - } else { - log.Warn("HTTP case not implemented", zap.String("case", cStr)) - } - } - gcaseStr := strings.Split(*gRPCCases, ",") - // todo: see pull 7345 - for _, str := range gcaseStr { - if fn, ok := cases.GRPCCaseFnMap[str]; ok { - if _, ok = cases.GRPCCaseMap[str]; !ok { - cases.GRPCCaseMap[str] = fn() - } - } else { - log.Warn("gRPC case not implemented", zap.String("case", str)) - } - } - if cfg.Client == 0 { log.Error("concurrency == 0, exit") return @@ -158,23 +117,39 @@ func main() { pdClis := make([]pd.Client, cfg.Client) for i := int64(0); i < cfg.Client; i++ { pdClis[i] = newPDClient(ctx, cfg) + pdClis[i].UpdateOption(pd.EnableFollowerHandle, true) } httpClis := make([]pdHttp.Client, cfg.Client) for i := int64(0); i < cfg.Client; i++ { sd := pdClis[i].GetServiceDiscovery() - httpClis[i] = pdHttp.NewClientWithServiceDiscovery("tools-api-bench", sd, pdHttp.WithTLSConfig(loadTLSConfig(cfg))) + httpClis[i] = pdHttp.NewClientWithServiceDiscovery("tools-api-bench", sd, pdHttp.WithTLSConfig(loadTLSConfig(cfg)), pdHttp.WithMetrics(pdAPIRequestCounter, pdAPIExecutionHistogram)) } err = cases.InitCluster(ctx, pdClis[0], httpClis[0]) if err != nil { log.Fatal("InitCluster error", zap.Error(err)) } - for _, hcase := range cases.HTTPCaseMap { - handleHTTPCase(ctx, hcase, httpClis) + coordinator := cases.NewCoordinator(ctx, httpClis, pdClis) + + hcaseStr := strings.Split(httpCases, ",") + for _, str := range hcaseStr { + name, cfg := parseCaseNameAndConfig(str) + if len(name) == 0 { + continue + } + coordinator.SetHTTPCase(name, cfg) } - for _, gcase := range cases.GRPCCaseMap { - handleGRPCCase(ctx, gcase, pdClis) + gcaseStr := strings.Split(gRPCCases, ",") + for _, str := range gcaseStr { + name, cfg := parseCaseNameAndConfig(str) + if len(name) == 0 { + continue + } + coordinator.SetGRPCCase(name, cfg) } + cfg.InitCoordinator(coordinator) + + go runHTTPServer(cfg, coordinator) <-ctx.Done() for _, cli := range pdClis { @@ -192,64 +167,144 @@ func main() { } } -func handleGRPCCase(ctx context.Context, gcase cases.GRPCCase, clients []pd.Client) { - qps := gcase.GetQPS() - burst := gcase.GetBurst() - cliNum := int64(len(clients)) - tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond - log.Info("begin to run gRPC case", zap.String("case", gcase.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) - for _, cli := range clients { - go func(cli pd.Client) { - var ticker = time.NewTicker(tt) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for i := int64(0); i < burst; i++ { - err := gcase.Unary(ctx, cli) - if err != nil { - log.Error("meet erorr when doing gRPC request", zap.String("case", gcase.Name()), zap.Error(err)) - } - } - case <-ctx.Done(): - log.Info("Got signal to exit handleGetRegion") - return - } +func exit(code int) { + os.Exit(code) +} + +func parseCaseNameAndConfig(str string) (string, *cases.Config) { + var err error + cfg := &cases.Config{} + name := "" + strs := strings.Split(str, "-") + // to get case name + strsa := strings.Split(strs[0], "+") + name = strsa[0] + // to get case Burst + if len(strsa) > 1 { + cfg.Burst, err = strconv.ParseInt(strsa[1], 10, 64) + if err != nil { + log.Error("parse burst failed for case", zap.String("case", name), zap.String("config", strsa[1])) + } + } + // to get case qps + if len(strs) > 1 { + strsb := strings.Split(strs[1], "+") + cfg.QPS, err = strconv.ParseInt(strsb[0], 10, 64) + if err != nil { + if err != nil { + log.Error("parse qps failed for case", zap.String("case", name), zap.String("config", strsb[0])) + } + } + // to get case Burst + if len(strsb) > 1 { + cfg.Burst, err = strconv.ParseInt(strsb[1], 10, 64) + if err != nil { + log.Error("parse burst failed for case", zap.String("case", name), zap.String("config", strsb[1])) } - }(cli) + } + } + if cfg.QPS == 0 && qps > 0 { + cfg.QPS = qps + } + if cfg.Burst == 0 && burst > 0 { + cfg.Burst = burst } + return name, cfg } -func handleHTTPCase(ctx context.Context, hcase cases.HTTPCase, httpClis []pdHttp.Client) { - qps := hcase.GetQPS() - burst := hcase.GetBurst() - cliNum := int64(len(httpClis)) - tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond - log.Info("begin to run http case", zap.String("case", hcase.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) - for _, hCli := range httpClis { - go func(hCli pdHttp.Client) { - var ticker = time.NewTicker(tt) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for i := int64(0); i < burst; i++ { - err := hcase.Do(ctx, hCli) - if err != nil { - log.Error("meet erorr when doing HTTP request", zap.String("case", hcase.Name()), zap.Error(err)) - } - } - case <-ctx.Done(): - log.Info("Got signal to exit handleScanRegions") - return - } +func runHTTPServer(cfg *config.Config, co *cases.Coordinator) { + gin.SetMode(gin.ReleaseMode) + engine := gin.New() + engine.Use(gin.Recovery()) + engine.Use(cors.Default()) + engine.Use(gzip.Gzip(gzip.DefaultCompression)) + engine.GET("metrics", utils.PromHandler()) + // profile API + pprof.Register(engine) + + getCfg := func(c *gin.Context) *cases.Config { + var err error + cfg := &cases.Config{} + qpsStr := c.Query("qps") + if len(qpsStr) > 0 { + cfg.QPS, err = strconv.ParseInt(qpsStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + } + } + burstStr := c.Query("burst") + if len(burstStr) > 0 { + cfg.Burst, err = strconv.ParseInt(burstStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) } - }(hCli) + } + return cfg } -} -func exit(code int) { - os.Exit(code) + engine.POST("config/http/all", func(c *gin.Context) { + var input map[string]cases.Config + if err := c.ShouldBindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for name, cfg := range input { + co.SetHTTPCase(name, &cfg) + } + c.String(http.StatusOK, "") + }) + engine.POST("config/http/:name", func(c *gin.Context) { + name := c.Param("name") + cfg := getCfg(c) + co.SetHTTPCase(name, cfg) + c.String(http.StatusOK, "") + }) + engine.POST("config/grpc/all", func(c *gin.Context) { + var input map[string]cases.Config + if err := c.ShouldBindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for name, cfg := range input { + co.SetGRPCCase(name, &cfg) + } + c.String(http.StatusOK, "") + }) + engine.POST("config/grpc/:name", func(c *gin.Context) { + name := c.Param("name") + cfg := getCfg(c) + co.SetGRPCCase(name, cfg) + c.String(http.StatusOK, "") + }) + + engine.GET("config/http/all", func(c *gin.Context) { + all := co.GetAllHTTPCases() + c.IndentedJSON(http.StatusOK, all) + }) + engine.GET("config/http/:name", func(c *gin.Context) { + name := c.Param("name") + cfg, err := co.GetHTTPCase(name) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, cfg) + }) + engine.GET("config/grpc/all", func(c *gin.Context) { + all := co.GetAllGRPCCases() + c.IndentedJSON(http.StatusOK, all) + }) + engine.GET("config/grpc/:name", func(c *gin.Context) { + name := c.Param("name") + cfg, err := co.GetGRPCCase(name) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, cfg) + }) + // nolint + engine.Run(cfg.StatusAddr) } func trimHTTPPrefix(str string) string { diff --git a/tools/pd-heartbeat-bench/config/config.go b/tools/pd-heartbeat-bench/config/config.go index be669c00666..74c8159ced9 100644 --- a/tools/pd-heartbeat-bench/config/config.go +++ b/tools/pd-heartbeat-bench/config/config.go @@ -59,7 +59,8 @@ func NewConfig() *Config { fs := cfg.flagSet fs.ParseErrorsWhitelist.UnknownFlags = true fs.StringVar(&cfg.configFile, "config", "", "config file") - fs.StringVar(&cfg.PDAddr, "pd", "127.0.0.1:2379", "pd address") + fs.StringVar(&cfg.PDAddr, "pd-endpoints", "127.0.0.1:2379", "pd address") + fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path") fs.StringVar(&cfg.StatusAddr, "status-addr", "127.0.0.1:20180", "status address") return cfg