Skip to content

Commit

Permalink
etcd_worker_test: fix test fail
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored and ti-chi-bot committed Nov 4, 2021
1 parent 135349b commit 50dbcfe
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if !rl.Allow() {
continue
}
// it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick
nextState, err := worker.reactor.Tick(ctx, worker.state)
if err != nil {
if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) {
Expand Down
21 changes: 14 additions & 7 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
type intReactorState struct {
val int
isUpdated bool
lastVal int
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
Expand All @@ -289,6 +290,12 @@ func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er
if err != nil {
log.Panic("intReactorState", zap.Error(err))
}
// As long as we can ensure that val is monotonically increasing,
// we can ensure that the linearizability of state changes
if s.lastVal > s.val {
log.Panic("linearizability check failed, lastVal must less than current val", zap.Int("lastVal", s.lastVal), zap.Int("val", s.val))
}
s.lastVal = s.val
s.isUpdated = !isInit
return nil
}
Expand All @@ -298,17 +305,17 @@ func (s *intReactorState) GetPatches() [][]DataPatch {
}

type linearizabilityReactor struct {
state *intReactorState
expected int
state *intReactorState
tickCount int
}

func (r *linearizabilityReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) {
r.state = state.(*intReactorState)
if r.state.isUpdated {
if r.state.val != r.expected {
log.Panic("linearizability check failed", zap.Int("expected", r.expected), zap.Int("actual", r.state.val))
if r.state.val < r.tickCount {
log.Panic("linearizability check failed, val must larger than tickCount", zap.Int("expected", r.tickCount), zap.Int("actual", r.state.val))
}
r.expected++
r.tickCount++
}
if r.state.val == 1999 {
return r.state, cerrors.ErrReactorFinished
Expand All @@ -334,8 +341,8 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) {
}

reactor, err := NewEtcdWorker(cli0, testEtcdKeyPrefix+"/lin", &linearizabilityReactor{
state: nil,
expected: 999,
state: nil,
tickCount: 999,
}, &intReactorState{
val: 0,
isUpdated: false,
Expand Down

0 comments on commit 50dbcfe

Please sign in to comment.