Skip to content

Commit

Permalink
Merge branch 'master' into add-switch
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 17, 2024
2 parents cb367e7 + 6472313 commit ee9014b
Show file tree
Hide file tree
Showing 20 changed files with 838 additions and 275 deletions.
3 changes: 1 addition & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -249,7 +248,7 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
true, /* withPrefix */
)
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ func getOperatorByRegion(c *gin.Context) {

// @Tags operators
// @Summary List operators.
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting)
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting)
// @Param object query bool false "Whether to return as JSON object."
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
Expand All @@ -337,6 +338,7 @@ func getOperators(c *gin.Context) {
)

kinds := c.QueryArray("kind")
_, objectFlag := c.GetQuery("object")
if len(kinds) == 0 {
results, err = handler.GetOperators()
} else {
Expand All @@ -347,7 +349,15 @@ func getOperators(c *gin.Context) {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, results)
if objectFlag {
objResults := make([]*operator.OpObject, len(results))
for i, op := range results {
objResults[i] = op.ToJSONObject()
}
c.IndentedJSON(http.StatusOK, objResults)
} else {
c.IndentedJSON(http.StatusOK, results)
}
}

// @Tags operator
Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
false, /* withPrefix */
)
cw.configWatcher.StartWatchLoop()
return cw.configWatcher.WaitLoad()
Expand Down Expand Up @@ -176,7 +177,7 @@ func (cw *Watcher) initializeTTLConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
cw.ttlConfigWatcher.StartWatchLoop()
return cw.ttlConfigWatcher.WaitLoad()
Expand Down Expand Up @@ -217,7 +218,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
cw.schedulerConfigWatcher.StartWatchLoop()
return cw.schedulerConfigWatcher.WaitLoad()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (w *Watcher) initializeStoreWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
w.storeWatcher.StartWatchLoop()
return w.storeWatcher.WaitLoad()
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
preEventsFn,
putFn, deleteFn,
postEventsFn,
clientv3.WithPrefix(),
true, /* withPrefix */
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
Expand Down Expand Up @@ -232,7 +232,7 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
rw.labelWatcher.StartWatchLoop()
return rw.labelWatcher.WaitLoad()
Expand Down
33 changes: 33 additions & 0 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,39 @@ func (o *Operator) MarshalJSON() ([]byte, error) {
return []byte(`"` + o.String() + `"`), nil
}

// OpObject is used to return Operator as a json object for API.
type OpObject struct {
Desc string `json:"desc"`
Brief string `json:"brief"`
RegionID uint64 `json:"region_id"`
RegionEpoch *metapb.RegionEpoch `json:"region_epoch"`
Kind OpKind `json:"kind"`
Timeout string `json:"timeout"`
Status OpStatus `json:"status"`
}

// ToJSONObject serializes Operator as JSON object.
func (o *Operator) ToJSONObject() *OpObject {
var status OpStatus
if o.CheckSuccess() {
status = SUCCESS
} else if o.CheckTimeout() {
status = TIMEOUT
} else {
status = o.Status()
}

return &OpObject{
Desc: o.desc,
Brief: o.brief,
RegionID: o.regionID,
RegionEpoch: o.regionEpoch,
Kind: o.kind,
Timeout: o.timeout.String(),
Status: status,
}
}

// Desc returns the operator's short description.
func (o *Operator) Desc() string {
return o.desc
Expand Down
33 changes: 33 additions & 0 deletions pkg/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,36 @@ func (suite *operatorTestSuite) TestRecord() {
re.Equal(now, ob.FinishTime)
re.Greater(ob.duration.Seconds(), time.Second.Seconds())
}

func (suite *operatorTestSuite) TestToJSONObject() {
steps := []OpStep{
AddPeer{ToStore: 1, PeerID: 1},
TransferLeader{FromStore: 3, ToStore: 1},
RemovePeer{FromStore: 3},
}
op := suite.newTestOperator(101, OpLeader|OpRegion, steps...)
op.Start()
obj := op.ToJSONObject()
suite.Equal("test", obj.Desc)
suite.Equal("test", obj.Brief)
suite.Equal(uint64(101), obj.RegionID)
suite.Equal(OpLeader|OpRegion, obj.Kind)
suite.Equal("12m0s", obj.Timeout)
suite.Equal(STARTED, obj.Status)

// Test SUCCESS status.
region := suite.newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2})
suite.Nil(op.Check(region))
suite.Equal(SUCCESS, op.Status())
obj = op.ToJSONObject()
suite.Equal(SUCCESS, obj.Status)

// Test TIMEOUT status.
steps = []OpStep{TransferLeader{FromStore: 2, ToStore: 1}}
op = suite.newTestOperator(1, OpLeader, steps...)
op.Start()
op.SetStatusReachTime(STARTED, op.GetStartTime().Add(-FastStepWaitTime-time.Second))
suite.True(op.CheckTimeout())
obj = op.ToJSONObject()
suite.Equal(TIMEOUT, obj.Status)
}
4 changes: 4 additions & 0 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
continue
}
storeID := store.GetID()
// It is used to avoid sudden scheduling when scheduling service is just started.
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
return false
}
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
return false
Expand Down
11 changes: 3 additions & 8 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/http"
"regexp"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -485,8 +484,6 @@ func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig {
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
Expand Down Expand Up @@ -518,7 +515,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
true, /* withPrefix */
)
kgm.tsoNodesWatcher.StartWatchLoop()
if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
Expand All @@ -535,9 +532,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
// Value: endpoint.KeyspaceGroup
func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
rootPath := kgm.legacySvcRootPath
startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/")
endKey := strings.Join(
[]string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/")
startKey := rootPath + "/" + endpoint.KeyspaceGroupIDPrefix()

defaultKGConfigured := false
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down Expand Up @@ -577,7 +572,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
putFn,
deleteFn,
postEventsFn,
clientv3.WithRange(endKey),
true, /* withPrefix */
)
if kgm.loadKeyspaceGroupsTimeout > 0 {
kgm.groupWatcher.SetLoadTimeout(kgm.loadKeyspaceGroupsTimeout)
Expand Down
61 changes: 36 additions & 25 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,10 @@ func InitOrGetClusterID(c *clientv3.Client, key string) (uint64, error) {
}

const (
defaultLoadDataFromEtcdTimeout = 30 * time.Second
defaultLoadFromEtcdRetryInterval = 200 * time.Millisecond
defaultLoadFromEtcdRetryTimes = int(defaultLoadDataFromEtcdTimeout / defaultLoadFromEtcdRetryInterval)
defaultLoadBatchSize = 400
defaultWatchChangeRetryInterval = 1 * time.Second
defaultForceLoadMinimalInterval = 200 * time.Millisecond
defaultLoadDataFromEtcdTimeout = 5 * time.Minute
defaultEtcdRetryInterval = time.Second
defaultLoadFromEtcdRetryTimes = 3
defaultLoadBatchSize = 400

// RequestProgressInterval is the interval to call RequestProgress for watcher.
RequestProgressInterval = 1 * time.Second
Expand All @@ -580,8 +578,8 @@ type LoopWatcher struct {

// key is the etcd key to watch.
key string
// opts is used to set etcd options.
opts []clientv3.OpOption
// isWithPrefix indicates whether the watcher is with prefix.
isWithPrefix bool

// forceLoadCh is used to force loading data from etcd.
forceLoadCh chan struct{}
Expand Down Expand Up @@ -623,7 +621,7 @@ func NewLoopWatcher(
preEventsFn func([]*clientv3.Event) error,
putFn, deleteFn func(*mvccpb.KeyValue) error,
postEventsFn func([]*clientv3.Event) error,
opts ...clientv3.OpOption,
isWithPrefix bool,
) *LoopWatcher {
return &LoopWatcher{
ctx: ctx,
Expand All @@ -638,12 +636,12 @@ func NewLoopWatcher(
deleteFn: deleteFn,
postEventsFn: postEventsFn,
preEventsFn: preEventsFn,
opts: opts,
isWithPrefix: isWithPrefix,
lastTimeForceLoad: time.Now(),
loadTimeout: defaultLoadDataFromEtcdTimeout,
loadRetryTimes: defaultLoadFromEtcdRetryTimes,
loadBatchSize: defaultLoadBatchSize,
watchChangeRetryInterval: defaultWatchChangeRetryInterval,
watchChangeRetryInterval: defaultEtcdRetryInterval,
}
}

Expand Down Expand Up @@ -689,23 +687,15 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
watchStartRevision int64
err error
)
ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval)
ticker := time.NewTicker(defaultEtcdRetryInterval)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()

for i := 0; i < lw.loadRetryTimes; i++ {
failpoint.Inject("loadTemporaryFail", func(val failpoint.Value) {
if maxFailTimes, ok := val.(int); ok && i < maxFailTimes {
err = errors.New("fail to read from etcd")
failpoint.Continue()
}
})
failpoint.Inject("delayLoad", func(val failpoint.Value) {
if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 {
time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second)
}
})
watchStartRevision, err = lw.load(ctx)
if err == nil {
break
Expand Down Expand Up @@ -754,7 +744,10 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
// make sure to wrap context with "WithRequireLeader".
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
watcherCancel = cancel
opts := append(lw.opts, clientv3.WithRev(revision), clientv3.WithProgressNotify())
opts := []clientv3.OpOption{clientv3.WithRev(revision), clientv3.WithProgressNotify()}
if lw.isWithPrefix {
opts = append(opts, clientv3.WithPrefix())
}
done := make(chan struct{})
go grpcutil.CheckStream(watcherCtx, watcherCancel, done)
watchChan := watcher.Watch(watcherCtx, lw.key, opts...)
Expand Down Expand Up @@ -864,8 +857,14 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
}

func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) {
ctx, cancel := context.WithTimeout(ctx, DefaultRequestTimeout)
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()
failpoint.Inject("delayLoad", func(val failpoint.Value) {
if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 {
time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second)
}
})

startKey := lw.key
// If limit is 0, it means no limit.
// If limit is not 0, we need to add 1 to limit to get the next key.
Expand All @@ -883,10 +882,22 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)
zap.String("key", lw.key), zap.Error(err))
}
}()

// In most cases, 'Get(foo, WithPrefix())' is equivalent to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
// However, when the startKey changes, the two are no longer equivalent.
// For example, the end key for 'WithRange(GetPrefixRangeEnd(foo))' is consistently 'fop'.
// But when using 'Get(foo1, WithPrefix())', the end key becomes 'foo2', not 'fop'.
// So, we use 'WithRange()' to avoid this problem.
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(limit)}
if lw.isWithPrefix {
opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey)))
}

for {
// Sort by key to get the next key and we don't need to worry about the performance,
// Because the default sort is just SortByKey and SortAscend
opts := append(lw.opts, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), clientv3.WithLimit(limit))
resp, err := clientv3.NewKV(lw.client).Get(ctx, startKey, opts...)
if err != nil {
log.Error("load failed in watch loop", zap.String("name", lw.name),
Expand Down Expand Up @@ -923,14 +934,14 @@ func (lw *LoopWatcher) ForceLoad() {
// Two-phase locking is also used to let most of the requests return directly without acquiring
// the write lock and causing the system to choke.
lw.forceLoadMu.RLock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
if time.Since(lw.lastTimeForceLoad) < defaultEtcdRetryInterval {
lw.forceLoadMu.RUnlock()
return
}
lw.forceLoadMu.RUnlock()

lw.forceLoadMu.Lock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
if time.Since(lw.lastTimeForceLoad) < defaultEtcdRetryInterval {
lw.forceLoadMu.Unlock()
return
}
Expand Down
Loading

0 comments on commit ee9014b

Please sign in to comment.