diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index b5cfc4a6c42..102fe84db11 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" ) // EtcdWorker handles all interactions with Etcd @@ -121,6 +122,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } lastReceivedEventTime := time.Now() + // tickRate represents the number of times EtcdWorker can tick + // the reactor per second + tickRate := time.Second / timerInterval + rl := rate.NewLimiter(rate.Limit(tickRate), 1) for { var response clientv3.WatchResponse select { @@ -137,7 +142,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } case response = <-watchCh: // In this select case, we receive new events from Etcd, and call handleEvent if appropriate. - if err := response.Err(); err != nil { return errors.Trace(err) } @@ -184,6 +188,15 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err := worker.applyUpdates(); err != nil { return errors.Trace(err) } + + // If !rl.Allow(), skip this Tick to avoid etcd worker tick reactor too frequency. + // It make etcdWorker to batch etcd changed event in worker.state. + // The semantics of `ReactorState` requires that any implementation + // can batch updates internally. + if !rl.Allow() { + continue + } + // it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick nextState, err := worker.reactor.Tick(ctx, worker.state) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index dc78d0fce62..72cced6004f 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -281,6 +281,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { type intReactorState struct { val int isUpdated bool + lastVal int } func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { @@ -289,6 +290,12 @@ func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er if err != nil { log.Panic("intReactorState", zap.Error(err)) } + // As long as we can ensure that val is monotonically increasing, + // we can ensure that the linearizability of state changes + if s.lastVal > s.val { + log.Panic("linearizability check failed, lastVal must less than current val", zap.Int("lastVal", s.lastVal), zap.Int("val", s.val)) + } + s.lastVal = s.val s.isUpdated = !isInit return nil } @@ -298,17 +305,17 @@ func (s *intReactorState) GetPatches() [][]DataPatch { } type linearizabilityReactor struct { - state *intReactorState - expected int + state *intReactorState + tickCount int } func (r *linearizabilityReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) { r.state = state.(*intReactorState) if r.state.isUpdated { - if r.state.val != r.expected { - log.Panic("linearizability check failed", zap.Int("expected", r.expected), zap.Int("actual", r.state.val)) + if r.state.val < r.tickCount { + log.Panic("linearizability check failed, val must larger than tickCount", zap.Int("expected", r.tickCount), zap.Int("actual", r.state.val)) } - r.expected++ + r.tickCount++ } if r.state.val == 1999 { return r.state, cerrors.ErrReactorFinished @@ -334,8 +341,8 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { } reactor, err := NewEtcdWorker(cli0, testEtcdKeyPrefix+"/lin", &linearizabilityReactor{ - state: nil, - expected: 999, + state: nil, + tickCount: 999, }, &intReactorState{ val: 0, isUpdated: false,