Skip to content

Commit

Permalink
Merge branch 'master' into prevent-scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 17, 2024
2 parents da7d32c + 3a12148 commit e22d222
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 67 deletions.
3 changes: 1 addition & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -249,7 +248,7 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
true, /* withPrefix */
)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
false, /* withPrefix */
)
cw.configWatcher.StartWatchLoop()
return cw.configWatcher.WaitLoad()
Expand Down Expand Up @@ -176,7 +177,7 @@ func (cw *Watcher) initializeTTLConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
cw.ttlConfigWatcher.StartWatchLoop()
return cw.ttlConfigWatcher.WaitLoad()
Expand Down Expand Up @@ -217,7 +218,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
cw.schedulerConfigWatcher.StartWatchLoop()
return cw.schedulerConfigWatcher.WaitLoad()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (w *Watcher) initializeStoreWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
w.storeWatcher.StartWatchLoop()
return w.storeWatcher.WaitLoad()
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
preEventsFn,
putFn, deleteFn,
postEventsFn,
clientv3.WithPrefix(),
true, /* withPrefix */
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
Expand Down Expand Up @@ -232,7 +232,7 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
true, /* withPrefix */
)
rw.labelWatcher.StartWatchLoop()
return rw.labelWatcher.WaitLoad()
Expand Down
11 changes: 3 additions & 8 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/http"
"regexp"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -485,8 +484,6 @@ func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig {
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
Expand Down Expand Up @@ -518,7 +515,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
true, /* withPrefix */
)
kgm.tsoNodesWatcher.StartWatchLoop()
if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
Expand All @@ -535,9 +532,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
// Value: endpoint.KeyspaceGroup
func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
rootPath := kgm.legacySvcRootPath
startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/")
endKey := strings.Join(
[]string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/")
startKey := rootPath + "/" + endpoint.KeyspaceGroupIDPrefix()

defaultKGConfigured := false
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down Expand Up @@ -577,7 +572,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
putFn,
deleteFn,
postEventsFn,
clientv3.WithRange(endKey),
true, /* withPrefix */
)
if kgm.loadKeyspaceGroupsTimeout > 0 {
kgm.groupWatcher.SetLoadTimeout(kgm.loadKeyspaceGroupsTimeout)
Expand Down
61 changes: 36 additions & 25 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,10 @@ func InitOrGetClusterID(c *clientv3.Client, key string) (uint64, error) {
}

const (
defaultLoadDataFromEtcdTimeout = 30 * time.Second
defaultLoadFromEtcdRetryInterval = 200 * time.Millisecond
defaultLoadFromEtcdRetryTimes = int(defaultLoadDataFromEtcdTimeout / defaultLoadFromEtcdRetryInterval)
defaultLoadBatchSize = 400
defaultWatchChangeRetryInterval = 1 * time.Second
defaultForceLoadMinimalInterval = 200 * time.Millisecond
defaultLoadDataFromEtcdTimeout = 5 * time.Minute
defaultEtcdRetryInterval = time.Second
defaultLoadFromEtcdRetryTimes = 3
defaultLoadBatchSize = 400

// RequestProgressInterval is the interval to call RequestProgress for watcher.
RequestProgressInterval = 1 * time.Second
Expand All @@ -580,8 +578,8 @@ type LoopWatcher struct {

// key is the etcd key to watch.
key string
// opts is used to set etcd options.
opts []clientv3.OpOption
// isWithPrefix indicates whether the watcher is with prefix.
isWithPrefix bool

// forceLoadCh is used to force loading data from etcd.
forceLoadCh chan struct{}
Expand Down Expand Up @@ -623,7 +621,7 @@ func NewLoopWatcher(
preEventsFn func([]*clientv3.Event) error,
putFn, deleteFn func(*mvccpb.KeyValue) error,
postEventsFn func([]*clientv3.Event) error,
opts ...clientv3.OpOption,
isWithPrefix bool,
) *LoopWatcher {
return &LoopWatcher{
ctx: ctx,
Expand All @@ -638,12 +636,12 @@ func NewLoopWatcher(
deleteFn: deleteFn,
postEventsFn: postEventsFn,
preEventsFn: preEventsFn,
opts: opts,
isWithPrefix: isWithPrefix,
lastTimeForceLoad: time.Now(),
loadTimeout: defaultLoadDataFromEtcdTimeout,
loadRetryTimes: defaultLoadFromEtcdRetryTimes,
loadBatchSize: defaultLoadBatchSize,
watchChangeRetryInterval: defaultWatchChangeRetryInterval,
watchChangeRetryInterval: defaultEtcdRetryInterval,
}
}

Expand Down Expand Up @@ -689,23 +687,15 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
watchStartRevision int64
err error
)
ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval)
ticker := time.NewTicker(defaultEtcdRetryInterval)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()

for i := 0; i < lw.loadRetryTimes; i++ {
failpoint.Inject("loadTemporaryFail", func(val failpoint.Value) {
if maxFailTimes, ok := val.(int); ok && i < maxFailTimes {
err = errors.New("fail to read from etcd")
failpoint.Continue()
}
})
failpoint.Inject("delayLoad", func(val failpoint.Value) {
if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 {
time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second)
}
})
watchStartRevision, err = lw.load(ctx)
if err == nil {
break
Expand Down Expand Up @@ -754,7 +744,10 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
// make sure to wrap context with "WithRequireLeader".
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
watcherCancel = cancel
opts := append(lw.opts, clientv3.WithRev(revision), clientv3.WithProgressNotify())
opts := []clientv3.OpOption{clientv3.WithRev(revision), clientv3.WithProgressNotify()}
if lw.isWithPrefix {
opts = append(opts, clientv3.WithPrefix())
}
done := make(chan struct{})
go grpcutil.CheckStream(watcherCtx, watcherCancel, done)
watchChan := watcher.Watch(watcherCtx, lw.key, opts...)
Expand Down Expand Up @@ -864,8 +857,14 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
}

func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) {
ctx, cancel := context.WithTimeout(ctx, DefaultRequestTimeout)
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()
failpoint.Inject("delayLoad", func(val failpoint.Value) {
if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 {
time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second)
}
})

startKey := lw.key
// If limit is 0, it means no limit.
// If limit is not 0, we need to add 1 to limit to get the next key.
Expand All @@ -883,10 +882,22 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)
zap.String("key", lw.key), zap.Error(err))
}
}()

// In most cases, 'Get(foo, WithPrefix())' is equivalent to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
// However, when the startKey changes, the two are no longer equivalent.
// For example, the end key for 'WithRange(GetPrefixRangeEnd(foo))' is consistently 'fop'.
// But when using 'Get(foo1, WithPrefix())', the end key becomes 'foo2', not 'fop'.
// So, we use 'WithRange()' to avoid this problem.
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(limit)}
if lw.isWithPrefix {
opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey)))
}

for {
// Sort by key to get the next key and we don't need to worry about the performance,
// Because the default sort is just SortByKey and SortAscend
opts := append(lw.opts, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), clientv3.WithLimit(limit))
resp, err := clientv3.NewKV(lw.client).Get(ctx, startKey, opts...)
if err != nil {
log.Error("load failed in watch loop", zap.String("name", lw.name),
Expand Down Expand Up @@ -923,14 +934,14 @@ func (lw *LoopWatcher) ForceLoad() {
// Two-phase locking is also used to let most of the requests return directly without acquiring
// the write lock and causing the system to choke.
lw.forceLoadMu.RLock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
if time.Since(lw.lastTimeForceLoad) < defaultEtcdRetryInterval {
lw.forceLoadMu.RUnlock()
return
}
lw.forceLoadMu.RUnlock()

lw.forceLoadMu.Lock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
if time.Since(lw.lastTimeForceLoad) < defaultEtcdRetryInterval {
lw.forceLoadMu.Unlock()
return
}
Expand Down
Loading

0 comments on commit e22d222

Please sign in to comment.